© IntegrationWise, 2025
Server Sent Events is a web standard that's been around since about 2006 and is supported by all major browsers. In short, it allows a server to push events to connected clients.
This post explores how to implement Server Sent Events on the webMethods IntegrationServer.
The IS has had direct support for WebSockets since v10.11 (well, really only after IS ServiceFix 13 or so), but there is no support for Server Sent Events. WebSockets allow for fully bi-directional data flows. The client (usually a web application inside a browser) and the server can interact with each other over the same TCP connection.
Using WebSockets on the IntegrationServer requires one to create a special port of type webMethods/WebSocket(Secure)
.
Server Sent Events on the other hand can be implemented using a standard HTTP or HTTP/s port.
Server Sent Events (SSE) only allow the server to send data to the client, but the data can only be text in UTF-8 format. Binary data is not supported, although one can always use Base64 encoding.
For a complete description of the differences between the two protocols, have a look at this blog. The gist is that SSE is the simpler protocol, that can be implemented on top of an existing HTTP connection, as it uses the XMLHttpRequest (XHR) protocol.
Let's create the simplest implementation possible. For this we need:
EventSource
that sets up
a SSE connection to some service the IS. An event handler will make a received event visible on the page.
n
iterations.Let's use the following HTML page as an example. The HTML contains an unordered list (id = 'messageList') to which list items will be added by
the onmessage()
handler of the EventSource
on /invoke/sample.sse/subscribe
:
<!DOCTYPE html> <html lang="en"> <head> <title>Server Sent Events demo</title> <script type="text/javascript"> const evtSource = new EventSource("/invoke/sample.sse/subscribe"); evtSource.onmessage = function(event) { document.write(event.type + + event.data); const messageList = document.getElementById("messageList"); const item = document.createElement("li"); item.textContent = event.type + + event.data; messageList.append(item); } </script> </head> <body style="font-family: sans-serif, tahoma;"> <h2>This page demonstrates how to use Server Sent Events with wM IntegrationServer</h2> <p>Event list:</p> <!-- The events will be appended to the following list: --> <ul id="messageList"></ul> </body> </html>
The key element here is the URL that the EventSource uses: /invoke/sample.sse/subscribe
. That is a path
to a service in the IS, namely sample.sse:subscribe
. In our first implementation, this service will send every
few seconds a message with a counter to the client. The client stays connected, until the web page is closed. So the service
in the meantime cannot exit. If it does, then the client will reconnect automatically.
Save the html in a package named SAMPLE as:
$IS_INSTANCE_HOME/packages/SAMPLE/pub/sse-poc.html
and access the html page on the IS under the url:
http://localhost:5555/SAMPLE/sse-poc.html
On the IntegrationServer we need a service that enters a loop, writes the some data to the client, sleeps for a couple of seconds, and which then repeats the action.
The problem that we have to solve, is that inside the service, one must get hold of the OutputStream
that writes data
to the client.
Normally, the IS will take care of this. Once a service exits, the IS takes the IData
output of the service and serializes it in the way
that the client expects, be it JSON, XML, HTML (via a template). But in this case we cannot let the service finish. It must remain active
as long as the client stays connected.
The way to get hold of the OutputStream
is via the com.wm.app.b2b.server.InvokeState
class:
java.net.Socket socket = InvokeState.getCurrentSocket()
From the Socket one gets access to the Outputstream
:
java.io.OutputStream os = socket.getOutputStream()
Next up, we need to know how to format the message. Here the specification helps us,
which says to conform to the text/event-stream
MIME-type.
But as we're dealing with a raw output stream, one has to realize that the client first expects a valid HTTP response. So that's a status line and some headers, separated by a carriage-return/newline pair, with an extra carriage-return/newline pair after the last header:
HTTP/1.1 200 OK\r\n X-Accel-Buffering: no\r\n Content-Type: text/event-stream\r\n Cache-Control: no-cache\r\n \r\n
Then follows the event stream itself, which goes on indefinitely until the client closes the connection
Messages in the event stream are separated by a double newline, where a newline may be a single CR
character (ASCII 0x0D
), a single LF
character (ASCII 0x0A
), or a CRLF
character pair (ASCII 0x0D0A
). In the examples we follow the UNIX convention for line separation, so we use
the LF
character, in java and javascript represented as \n
.
event: message\n data: some message\n \n
Now that we know what to do, let's first create an empty Java Service named sample.sse:subscribe
:
In the Shared Source Area, create a method to compose the HTTP response header:
private static String composeHTTPEventResponse() { String header = "HTTP/1.1 200 OK" + "\r\n" + "X-Accel-Buffering: no" + "\r\n" + "Content-Type: text/event-stream" + "\r\n" + "Cache-Control: no-cache" + "\r\n" + "\r\n"; return header; }
Also create a method that composes a simple text/event-stream
event. Let the type be message
and put a counter in the text:
private static String composeSimpleEventMessage(int counter) { StringBuilder sb = new StringBuilder(); sb.append("event: message\n"); sb.append("data: event[" + counter + "]\n\n"); return sb.toString(); }
Next up we need a method that writes the event data, sleeps a while and then repeats the writing. For now let's limit ourselves to 10 times. This will also demonstrate the reconnect feature of Server Sent Events.
private static void simpleEventLoop() { Socket socket = InvokeState.getCurrentSocket(); OutputStream os = null; int count = 0; int max = 10; String event = "message"; try { os = socket.getOutputStream(); while(count < max) { if(count == 0) { os.write(composeHTTPEventResponse().getBytes("UTF-8")); JournalLogger.log(4, 90, 4, "sse", "Sent header"); } String data = composeSimpleEventMessage(count); os.write(data.getBytes("UTF-8")); JournalLogger.log(4, 90, 4, "sse", "Sent " + event + "["+count+"]"); count++; Thread.sleep(1000); } } catch (Exception e) { JournalLogger.log(4, 90, 1, "sse", e.getClass() + + e.getMessage()); } }
A couple of things to notice:
OutputStream
or the Socket
. The IntegrationServer will take care of that.
JournalLogger
writes to the server log of the IS. The first three parameters:
4
: the message template to use. In this case: '{0} - {1}'90
: the facility, in this case: 0090 pub Flow Services4
: the severity, in this case: InfoThe final step is to invoke this method from the Java Service sample.sse:subscribe
:
public static final void subscribe(IData pipeline) throws ServiceException { simpleEventLoop(); }
If you now access the url:
http://localhost:5555/SAMPLE/sse-poc.html
you'll se something like this:
Event list:
So right away the first 10 messages appear. But then, after some time, depending on the browser's settings, the event loop repeats! We see event[0] again!
The reason for this is that the web page reestablishes the connection with the EventSource
.
The server can
tell the client that it cannot longer send events by sending a HTTP 204 No Content
response, but that response
cannot be sent in the middle of the event stream. It can only be sent right after the initial connection.
One of the advantages of Server Sent Events is that it works on top of an existing HTTP connection. When the browser sets
up a connection to the EventSource
, it issues an XMLHttpRequest and passes along the necessary HTTP header fields
to maintain an already established session and/or provides the required credentials. One can verify this by inspecting the
header fields on the server by invoking pub.flow:getTransportInfo
and pub.flow:tracePipeline
:
private static void logTransport() { try { IData transportData = Service.doInvoke(NSName.create("pub.flow:getTransportInfo"), IDataFactory.create()); Service.doInvoke(NSName.create("pub.flow:tracePipeline"), transportData); } catch (Exception e) { e.printStackTrace(); } }
This reveals the following header fields sent in the XHR-request by the browser:
Host: localhost:5555 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:136.0) Gecko/20100101 Firefox/136.0 Accept: text/event-stream Accept-Language: en-GB,en;q=0.5 Accept-Encoding: gzip, deflate, br, zstd Referer: http://localhost:5555/SAMPLE/sse-poc.html Authorization: Basic ZGV2ZWxvcGVyOmRldmVsb3Blcg== Connection: keep-alive Cookie: ssnid=6e83bff272c94630ac170af8d373adc3 Sec-Fetch-Dest: empty Sec-Fetch-Mode: cors Sec-Fetch-Site: same-origin Priority: u=4 Cache-Control: no-cache
A couple of things to note:
text/event-stream
messagesOnce the method simpleEventLoop()
exits, the Java Service sample.sse:subscribe
exits and the IS takes over control. The dispatcher takes the (IData
) results from the service and
tries to return a response to the client. As the IS does not recognize the content type text/event-stream
in
the Accept
header, it returns HTML (the default) to the client. In the console of the web browser you can see after the last
event, a standard HTTP response from the IS in HTML format:
event: ping data: {"time_ms": "1743516865670"} HTTP/1.1 200 OK Content-Type: text/html; charset=UTF-8 Content-Length: 75 <BODY bgcolor=#dddddd> <TABLE bgcolor=#dddddd border=1> </TABLE> </BODY>
This would normally not happen. The server is not supposed to stop the event stream.
Any other event type than message
needs its own event handler. Let's say that the server may send a close
event,
to which the client reacts by terminating the connection:
evtSource.addEventListener("close", function(event) { evtSource.close(); document.write("Connection closed, server requested"); });
There is no 'default' event handler. This:
evtSource.onmessage = function(event) {}
is an alias for this:
evtSource.addEventHandler("message", function(event) {})
The client has to know the event types it needs to act upon. Any unrecognized events, so events without a handler, will be ignored. This is a small complication in our journey to a general SSE solution on the IntegrationServer. Clients should not get unsollicited events as this only consumes bandwidth.
Detecting that the client has disconnected is somewhat difficult. These methods will always return false
:
socket.isInputShutdown(); socket.isOutputShutdown(); socket.isClosed()
and this one always true
socket.isConnected();
as long as the socket or its input or output stream has not been closed on the server. Regardless whether the client already disconnected. This is certainly somewhat unexpected.
There are two ways that one can detect a client disconnect:
OutputStream
fails with a: java.net.SocketException: Broken pipe (Write failed)
InputStream
with socket.getInputStream().read()
returns -1
The first method detects a disconnect when the server sends data to the client. If this happens infrequently, then it may occur that
a disconnect is only detected after quite some time. In the meantime, the server occupies system resources. Note that the four methods
on the Socket
will still let you believe that the client is still connected.
The second method requires that the server continuously reads from the InputStream. This blocks indefinitely, because after the first connect, the client will not send any data. On the other hand, this is the only way to detect a disconnect immediately.
We'll use the latter method in the generalized implementation.
The goal is to create a general solution for Server Sent Events on the IntegrationServer with the following features:
IData
) as an input. The string will be passed on as is, the document will be
encoded as a JSON string.In order to prevent that a client receives unsollicited messages, it must tell the server which event types it's interested in.
The only way that this is possible, is by adding query parameters to the url, as the EventSource
specification only allows two parameters to be specified: the url
whether or not to pass CORS credentials.
The implementation will react to the query parameter eventType
, which contains a comma-separated list of event types that the client
want to subscribe to. Example:
http://localhost:5555/invoke/sample.sse:subscribe?eventTypes=foo,bar,baz
If there is no parameter eventTypes
, then the client subscribes to all events.
The standard behaviour of the IntegrationServer is to parse the query string and place the parameters it finds, in our case eventTypes
, in
the pipeline when it invokes sample.sse:subscribe
:
There needs to be a register of client subscriptions. Once a client connects, its interest is stored here under an appropriate id, along with the event types and a handle to its socket.
There are several candidates for an 'appropriate id':
Of those three, the ContextID is probably the most convenient and unambigious to deal with. Each service invocation gets a new ContextID assigned, whereas sessions might be passed around and threads are reused. To get hold the ContextID of the current invocation:
InvokeState state = InvokeState.getCurrentState(); String [] contextIDs = state.getAuditRuntime().getContextStack(); String contextID = contextIDs[contextIDs.length - 1];
The first contextID in the stack refers to the top-level service, last to the service that currently is being executed.
The registry is an IData structure, with per client an entry, for example:
3133ec51-8da5-4e2b-b34f-092988654002
eventTypes: [foo,bar,baz]
socket: java.net.Socket@2e45b455
In this example eventTypes
is a String list. If the client did not send a query string with event types, then eventTypes
will be absent.
In order to implement the registry we need:
The implementation (all in the Shared Source Area):
private static IDataMap notificationList = new IDataMap(); private static void addNotificationListener(String key, String eventTypes, Socket socket) { synchronized(notificationList) { IDataMap data = new IDataMap(); if(eventTypes != null) { data.put("eventTypes", eventTypes.split(",")); } data.put("socket", socket); notificationList.put(key, data.getIData()); } JournalLogger.log(4, 90, 4, "sse", "=== ADDED listener. key: " + key + "; event types: " + eventTypes); } private static void removeNotificationListener(String key) { synchronized(notificationList) { Object entry = notificationList.remove(key); if(entry != null) { JournalLogger.log(4, 90, 4, "sse", "=== REMOVED listener. key: " + key); } else { JournalLogger.log(4, 90, 4, "sse", "=== NOT REMOVED listener, key: " + key + " not found"); } } }
The synchronized
blocks are necessary as we're working in a multi-threaded environment.
By reading from the InputStream
, which is a blocking operation, we achieve two goals:
-1
, which is the signal to remove the subscription
and exit the serviceprivate static void waitForClientDisconnect() { try { InputStream is = InvokeState.getCurrentSocket().getInputStream(); while(true) { try { int i = is.read(); if(i < 0) break; } catch (SocketTimeoutException ste) { //Ignore this exception } } } catch(IOException ioe) { JournalLogger.log(4, 90, 1, "sse", ioe.getClass() + + ioe.getMessage()); } }
So when exactly does the SocketTimeoutException
occur? Which setting governs this behaviour? As it turns out, it is
the Keep Alive Timeout (in milliseconds) on the HTTP port:
Regular HTTP Listener Configuration | |
Port | 5555 |
Alias | DefaultPrimary |
Description (optional) | Default Primary Port |
Package Name | WmRoot |
Bind Address (optional) | |
Backlog | 65534 |
Keep Alive Timeout | 20000 |
Regular HTTP Listener Configuration | |
Client Authentication | Username/Password |
We now have the building blocks to fully implement the subscribe logic. The sequence is:
The implementation:
private static void subscribe(String eventTypes) { InvokeState state = InvokeState.getCurrentState(); String [] contextIDs = state.getAuditRuntime().getContextStack(); String contextID = contextIDs[contextIDs.length - 1]; Socket socket = InvokeState.getCurrentSocket(); addNotificationListener(contextID, eventTypes, socket); if(writeHTTPEventStreamResponse(socket)) { waitForClientDisconnect(); } removeNotificationListener(contextIDs[contextIDs.length - 1]); }
The implementation of the writeHTTPEventStreamResponse(Socket socket)
method is as follows:
private static boolean writeHTTPEventStreamResponse(Socket socket) { boolean result = true; try { String response = composeHTTPEventResponse(); socket.getOutputStream().write(response.getBytes("utf-8")); } catch(IOException ioe) { JournalLogger.log(4, 90, 1, "sse", "Writing HTTP event-stream response failed: " + ioe.getClass() + + ioe.getMessage()); result = false; } return result; }
So here the choice has been made to return a boolean, instead of throwing an exception if one occurs. The reason is that the subscription of the client has to be removed. An exception would have bubbled up and would have prevented this.
And finally call the subscribe(String eventTypes)
from a Java Service:
public static final void subscribe(IData pipeline) throws ServiceException { IDataMap idm = new IDataMap(pipeline); String eventTypes = idm.getAsNonEmptyString("eventTypes"); subscribe(eventTypes); }
We would like have a Java Service that can be used to send an event of any type to all connected clients that are interested in that type. Let's name
the service sample.sse:send
. Its signature would look like this:
The logic:
eventType
is absent, then it defaults to message
.
data
is present, then it will be serialize as JSON.message
will be sent as isdata
and message
are absent, then the service throws an exception.
data
and message
are present, then message
takes precedence.
First we need to be able to compose the event. The initial HTTP response status plus headers has been sent earlier by the subscribe(String eventTypes)
method, so we only need to concern ourselves with the events themselves.
The requirements for the body of the event have somewhat expanded: if we find a document (IData
instance) in the input, instead of a String
, we need to encode it as JSON.
Secondly, we'd like to comply with the SSE specification, which says that multiline content has to be sent like this:
event: multiline data: this is line one data: this is line two, with two lines data: this is line three
Multiline content is a string with embedded carriage returns (ASCII 0x0D
), newlines (ASCII 0x0A
), or carriage returns/newline
(ASCII 0x0D0A
) combinations.
Furthermore, if both a String
and an IData
are passed, then we make the choice that the string takes precedence.
With this in mind, the implementation of composeEventMessage(String eventType, String data, IData content)
looks like this:
private static String composeEventMessage(String eventType, String data, IData content) throws ServiceException { StringBuilder sb = new StringBuilder(); sb.append("event: " + eventType + "\n"); if(data != null) { //Split the text in lines and add a 'data' entry per line String [] dataParts = data.split("\\r\\n|\\n|\\r"); for (String part : dataParts) { sb.append("data: " + part + "\n"); } } else { try { //Use a public service to serialize 'document' to JSON IDataMap pipeline = new IDataMap(); pipeline.put("document", content); Service.doInvoke(NSName.create("pub.json:documentToJSONString"), pipeline.getIData()); String jsonString = pipeline.getAsNonEmptyString("jsonString"); sb.append("data: " + jsonString + "\n"); } catch (Exception e) { throw new ServiceException(e); } } //Append event delimiter sb.append("\n"); return sb.toString(); }
Next up is the logic to identify the clients to which to send the event to. We need to go through the notificationList
(defined in the
Shared Source Area) and find the clients that have a matching subscription, plus the ones that subscribe to all events.
This is done by the method buildNotificationList(String eventType)
.
private static IData buildNotificationList(String eventType) { IDataMap myNotificationList = new IDataMap(); IDataCursor idc = notificationList.getIData().getCursor(); while(idc.next()) { IDataMap entry = new IDataMap((IData)idc.getValue()); String [] registeredEventTypes = entry.getAsStringArray("eventTypes"); if(registeredEventTypes == null) { //no specific subscriptions myNotificationList.put(idc.getKey(), entry); } else { for(int i = 0 ; i < registeredEventTypes.length ; i++) { if(eventType.equals(registeredEventTypes[i])) { myNotificationList.put(idc.getKey(), entry); break; //ignore possible other identical even types in the liest } } } } return myNotificationList.getIData(); }
We're almost there. The last step, implemented in the sendEvent(String eventType, String message, IData contents)
method puts it all together:
eventType
is not specified, set it to message
String
input and the IData
input are absent or nulleventType
private static void sendEvent(String eventType, String message, IData contents) throws ServiceException { if(message == null && contents == null) { throw new ServiceException("Cannot send event. Either 'message' of 'contents' must have a value"); } if(eventType == null) eventType = "message"; JournalLogger.log(4, 90, 4, "sse", "=== SEND START: Event = " + eventType); //Search for the interested recipients synchronized(notificationList) { //Collect interested clients in a new list: IData myNotificationList = buildNotificationList(eventType); IDataCursor mdc = myNotificationList.getCursor(); while(mdc.next()) { String contextID = mdc.getKey(); IDataMap entry = new IDataMap((IData)mdc.getValue()); Socket socket = (Socket) entry.get("socket"); try { String event = composeEventMessage(eventType, message, contents); socket.getOutputStream().write(event.getBytes("utf-8")); socket.getOutputStream().flush(); JournalLogger.log(4, 90, 4, "sse", "=== SENDING DATA: contextID: " + contextID); } catch(IOException ioe) { JournalLogger.log(4, 90, 1, "sse", "=== SENDING DATA FAILED: contextID: " + contextID + ": " + ioe.getClass().getName() + ": " + ioe.getMessage()); } } } JournalLogger.log(4, 90, 4, "sse", "=== SEND END ====="); }
Note here that the main logic is wrapped in a synchronized
block. Because we're running in a multi-threaded environment, we
need to make sure that only one thread at a time sends data to any given client.
Finally, call sendEvent(String eventType, String message, IData contents)
from the Java Service sample.sse:send
:
public static final void send(IData pipeline) throws ServiceException { IDataMap idm = new IDataMap(pipeline); String eventType = idm.getAsNonEmptyString("eventType"); IData data = idm.getAsIData("data"); String message = idm.getAsNonEmptyString("message"); sendEvent(eventType, message, data); }
The implementation of the last requirement, a service that shows the subscribed clients, is pretty straight-forward, but also here we need to keep in mind that we're running in a multi-threaded environment. By the way, it's best not to expose the socket in the pipeline, in order to prevent abuse.
The notificationList
is implemented as an IDataMap
, with the ContextID's as keys. Putting it raw in
the pipeline would result in this:
Instead, we create a method named buildClientList()
that returns an IData
array, and leaves the java.net.Socket
out.
We then call this method from the Java Service sample.sse:getClients
, implemented by the method getClients()
.
For completeness sake, the static variable notificationList
, which we defined when we set up the Client-EventType registry,
is shown here as well:
public static final void getClients(IData pipeline) throws ServiceException { IDataMap idm = new IDataMap(pipeline); idm.put("clients", buildClientList()); } // --- <<IS-BEGIN-SHARED-SOURCE-AREA>> --- private static IDataMap notificationList = new IDataMap(); private static IData [] buildClientList() { ArrayList>IData> clientList = new ArrayList>IData>(); synchronized(notificationList) { IDataCursor idc = notificationList.getIData().getCursor(); while(idc.next()) { IDataMap client = new IDataMap(); client.put("id", idc.getKey()); IData notificationListEntry = IDataUtil.getIData(idc); IDataCursor ide = notificationListEntry.getCursor(); String [] eventTypes = IDataUtil.getStringArray(ide, "eventTypes"); if(eventTypes != null) { client.put("eventTypes", eventTypes); } client.put("eventTypes", IDataUtil.getStringArray(ide, "eventTypes")); clientList.add(client.getIData()); } } return clientList.toArray(new IData[clientList.size()]); }
The signature of sample.sse:getClients
When we execute the service, and there are connected clients, the result looks something like this:
In Designer the three Java Services would look like this:
With fairly little code we were able to implement Server Sent Events (SSE) on webMethods IntegrationServer, and expose the functionality as regular Java Services.
SSE are easy to work with. There is no set up required, events do not have to be predefined and no special ports need to be created, like for WebSockets.
The implementation provided in this post is fully functional, but could be enhanced by:
application/event-stream
. If not, then the
service should throw an exception, telling the caller that the service is meant for receiving SSE.
sample.sse:getClients
service, e.g. the User-Agent and the Source IP address.