This is a simple example of how to use messaging, implemented in JMS [JMS]. It shows how to implement Request-Reply, where a requestor application sends a request, a replier application receives the request and returns a reply, and the requestor receives the reply. It also shows how an invalid message will be rerouted to a special channel.
This example was developed using JMS 1.1 and run using the J2EE 1.4 reference implementation.
Request/Reply Example
This example consists of two main classes:
- Requestor — A Message Endpoint that sends a request message and waits to receive a reply message as a response.
- Replier — A Message Endpoint that waits to receive the request message; when it does, it responds by sending the reply message.
The Requestor and the Replier will each run in a separate Java virtual machine (JVM), which is what makes the communication distributed.
This example assumes that the messaging system has these three queues defined:
- jms/RequestQueue — The Queue the Requestor uses to send the request message to the Replier.
- jms/ReplyQueue — The Queue the Replier uses to send the reply message to the Requestor.
- jms/InvalidMessages — The Queue that the Requestor and Replier move a message to when they receive a message that they cannot interpret.
Here's how the example works. When the Requestor is started in a command-line window, it starts and prints output like this:
Sent request Time: 1048261736520 ms Message ID: ID:_XYZ123_1048261766139_6.2.1.1 Correl. ID: null Reply to: com.sun.jms.Queue: jms/ReplyQueue Contents: Hello world.
What this shows is that the Requestor has sent a request message. Notice that this works even though the Replier isn't even running and therefore cannot receive the request.
When the Replier is started in another command-line window, it starts and prints output like this:
Received request Time: 1048261766790 ms Message ID: ID:_XYZ123_1048261766139_6.2.1.1 Correl. ID: null Reply to: com.sun.jms.Queue: jms/ReplyQueue Contents: Hello world. Sent reply Time: 1048261766850 ms Message ID: ID:_XYZ123_1048261758148_5.2.1.1 Correl. ID: ID:_XYZ123_1048261766139_6.2.1.1 Reply to: null Contents: Hello world.
This shows that the Replier received the request message and sent a reply message.
There are several items in this output that are interesting to notice. First, notice the request send and received timestamps; the request was received after it was sent (30270 ms later). Second, notice that the message ID is the same in both cases, because it's the same message. Third, notice that the contents, "Hello world," are the same, which is very good because this is the data being transmitted and it has got to be the same on both sides. (The request in this example is pretty lame. It is basically a Document Message; a real request would usually be a Command Message.) Forth, the queue named "jms/ReplyQueue" has been specified in the request message as the destination for the reply message (an example of the Return Address pattern).
Next, let's compare the output from receiving the request to that for sending the reply. First, notice the reply was not sent until after the request was received (60 ms after). Second, the message ID for the reply is different from that for the request; this is because the request and reply messages are different, separate messages. Third, the contents of the request have been extracted and added to the reply. Forth, the reply-to destination is unspecified because no reply is expected (the reply does not use the Return Address pattern). Fifth, the reply's correlation ID is the same as the request's message ID (the reply does use the Correlation Identifier pattern).
Finally, back in the first window, the requester received the reply:
Received reply Time: 1048261797060 ms Message ID: ID:_XYZ123_1048261758148_5.2.1.1 Correl. ID: ID:_XYZ123_1048261766139_6.2.1.1 Reply to: null Contents: Hello world.
This output contains several items of interest. The reply was received after it was sent (30210 ms). The message ID of the reply was the same when it was received as it was when it was sent, which proves that it is indeed the same message. The message contents received are the same as those sent. And the correlation ID tells the requestor which request this reply is for (the Correlation Identifier pattern).
Notice too that the requestor is designed to simply send a request, receive a reply, and exit. So having received the reply, the requestor is no longer running. The replier, on the other hand, doesn't know when it might receive a request, so it never stops running. To stop it, we go to its command shell window and press the return key, which causes the replier program to exit.
So this is the request/reply example. A request was prepared and sent by the requestor. The replier received the request and sent a reply. Then the requestor received the reply to its original request.
Request/Reply Code
First, let's take a look at how the Requestor is implemented:
import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.NamingException; public class Requestor { private Session session; private Destination replyQueue; private MessageProducer requestProducer; private MessageConsumer replyConsumer; private MessageProducer invalidProducer; protected Requestor() { super(); } public static Requestor newRequestor(Connection connection, String requestQueueName, String replyQueueName, String invalidQueueName) throws JMSException, NamingException { Requestor requestor = new Requestor(); requestor.initialize(connection, requestQueueName, replyQueueName, invalidQueueName); return requestor; } protected void initialize(Connection connection, String requestQueueName, String replyQueueName, String invalidQueueName) throws NamingException, JMSException { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination requestQueue = JndiUtil.getDestination(requestQueueName); replyQueue = JndiUtil.getDestination(replyQueueName); Destination invalidQueue = JndiUtil.getDestination(invalidQueueName); requestProducer = session.createProducer(requestQueue); replyConsumer = session.createConsumer(replyQueue); invalidProducer = session.createProducer(invalidQueue); } public void send() throws JMSException { TextMessage requestMessage = session.createTextMessage(); requestMessage.setText("Hello world."); requestMessage.setJMSReplyTo(replyQueue); requestProducer.send(requestMessage); System.out.println("Sent request"); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + requestMessage.getJMSMessageID()); System.out.println("\tCorrel. ID: " + requestMessage.getJMSCorrelationID()); System.out.println("\tReply to: " + requestMessage.getJMSReplyTo()); System.out.println("\tContents: " + requestMessage.getText()); } public void receiveSync() throws JMSException { Message msg = replyConsumer.receive(); if (msg instanceof TextMessage) { TextMessage replyMessage = (TextMessage) msg; System.out.println("Received reply "); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + replyMessage.getJMSMessageID()); System.out.println("\tCorrel. ID: " + replyMessage.getJMSCorrelationID()); System.out.println("\tReply to: " + replyMessage.getJMSReplyTo()); System.out.println("\tContents: " + replyMessage.getText()); } else { System.out.println("Invalid message detected"); System.out.println("\tType: " + msg.getClass().getName()); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + msg.getJMSMessageID()); System.out.println("\tCorrel. ID: " + msg.getJMSCorrelationID()); System.out.println("\tReply to: " + msg.getJMSReplyTo()); msg.setJMSCorrelationID(msg.getJMSMessageID()); invalidProducer.send(msg); System.out.println("Sent to invalid message queue"); System.out.println("\tType: " + msg.getClass().getName()); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + msg.getJMSMessageID()); System.out.println("\tCorrel. ID: " + msg.getJMSCorrelationID()); System.out.println("\tReply to: " + msg.getJMSReplyTo()); } } }
An application that wants to send requests and recieve replies could use a requestor to do so. The application provides its requestor a Connection to the messaging system. It also specifies the JNDI names of three queues: the request queue, the reply queue, and the invalid message queue. This is the information the requestor needs to initialize itself.
In initialize, the requestor uses the Connection and queue names to connect to the messaging system.
- It uses the Connection to create a Session. An application only needs one connection to a messaging system, but each component in the application that wishes to be able to send and receive messages independently needs its own session. Two threads cannot share a single session; they should each use a different session so that the sessions will work properly.
- It uses the queue names to look up the queues, which are Destinations. The names are JNDI identifiers; JndiUtil performs the JNDI lookups.
- It creates a MessageProducer for sending messages on the request queue, a MessageConsumer for receiving messages from the reply queue, and another producer for moving messages to the invalid message queue.
One thing that the requestor needs to be able to do is send request messages. For that, it implements the send() method.
- It creates a TextMessage and sets its contents to "Hello world."
- It sets the message's reply-to property to be the reply queue. This is a Return Address that will tell the replier how to send back the reply.
- It uses the requestProducer to send the message. The producer is connected to the request queue, so that's the queue the message is sent on.
- It then prints out the details of the message it just sent. This is done after the message is sent because the message ID is set by the messaging system and is not set until the message is actually sent.
The other thing the requestor needs to be able to do is receive reply messages. It implements the receiveSync() method for this purpose.
- It uses its replyConsumer to receive the reply. The consumer is connected to the reply queue, so it will receive messages from there. It uses the receive() method to get the message, which synchronously blocks until a message is delivered to the queue and is read from the queue, so the requestor is a Polling Consumer. Because this receive is synchronous, the requestor's method is called receiveSync().
- The message should be a TextMessage. If so, the requestor gets the message's contents and prints out the message's details.
- If the message is not a TextMessage, then the message cannot be processed. Rather than just discarding the message, the requestor resends it to the invalid message queue. Resending the message will change its message ID, so before resending it, the requestor stores its original message ID in its correlation ID (see Correlation Identifier).
In this way, a requestor does everything necessary to send a request, receive a reply, and route the reply to a special queue if the message does not make any sense.
Next, let's take a look at how the Replier is implemented:
import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.NamingException; public class Replier implements MessageListener { private Session session; private MessageProducer invalidProducer; protected Replier() { super(); } public static Replier newReplier(Connection connection, String requestQueueName, String invalidQueueName) throws JMSException, NamingException { Replier replier = new Replier(); replier.initialize(connection, requestQueueName, invalidQueueName); return replier; } protected void initialize(Connection connection, String requestQueueName, String invalidQueueName) throws NamingException, JMSException { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination requestQueue = JndiUtil.getDestination(requestQueueName); Destination invalidQueue = JndiUtil.getDestination(invalidQueueName); MessageConsumer requestConsumer = session.createConsumer(requestQueue); MessageListener listener = this; requestConsumer.setMessageListener(listener); invalidProducer = session.createProducer(invalidQueue); } public void onMessage(Message message) { try { if ((message instanceof TextMessage) && (message.getJMSReplyTo() != null)) { TextMessage requestMessage = (TextMessage) message; System.out.println("Received request"); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + requestMessage.getJMSMessageID()); System.out.println("\tCorrel. ID: " + requestMessage.getJMSCorrelationID()); System.out.println("\tReply to: " + requestMessage.getJMSReplyTo()); System.out.println("\tContents: " + requestMessage.getText()); String contents = requestMessage.getText(); Destination replyDestination = message.getJMSReplyTo(); MessageProducer replyProducer = session.createProducer(replyDestination); TextMessage replyMessage = session.createTextMessage(); replyMessage.setText(contents); replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); replyProducer.send(replyMessage); System.out.println("Sent reply"); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + replyMessage.getJMSMessageID()); System.out.println("\tCorrel. ID: " + replyMessage.getJMSCorrelationID()); System.out.println("\tReply to: " + replyMessage.getJMSReplyTo()); System.out.println("\tContents: " + replyMessage.getText()); } else { System.out.println("Invalid message detected"); System.out.println("\tType: " + message.getClass().getName()); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + message.getJMSMessageID()); System.out.println("\tCorrel. ID: " + message.getJMSCorrelationID()); System.out.println("\tReply to: " + message.getJMSReplyTo()); message.setJMSCorrelationID(message.getJMSMessageID()); invalidProducer.send(message); System.out.println("Sent to invalid message queue"); System.out.println("\tType: " + message.getClass().getName()); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + message.getJMSMessageID()); System.out.println("\tCorrel. ID: " + message.getJMSCorrelationID()); System.out.println("\tReply to: " + message.getJMSReplyTo()); } } catch (JMSException e) { e.printStackTrace(); } } }
Replier is what an application might use to receive a request and send a reply. The application provides its requestor a Connection to the messaging system, as well as the JNDI names of the request and invalid message queues. (It does not need to specify the name of the reply queue because, as we'll see, that will be provided by the message's Return Address.) This is the information the requestor needs to initialize itself.
The replier's initialize code is pretty similar to the requestor's, but there are a couple of differences:
- One difference is that the replier does not look up the reply queue and create a producer for it. This is because the replier does not assume it will always send replies on that queue; rather, as we'll see it will let the request message tell it what queue to send the reply message on.
- Another difference is that replier is an Event-Driven Consumer, so it implements MessageListener. When a message is delivered to the request queue, the messaging system will automatically call the replier's onMessage method.
Once the replier has initialized itself to be a listener on the request queue, there's not much for it to do but wait for messages. Unlike the requestor, which has to explicitedly poll the reply queue for messages, the replier is event-driven and so does nothing until the messaging system calls its onMessage method with a new message. The message will be from the request queue because initialize created the consumer on the request queue. Once onMessage receives a new message, it processes the message like this:
- Like with the requestor processing a reply message, the request message is supposed to be a TextMessage. It is also supposed to specify the queue to send the reply on. If the message does not meet these requirements, the replier will move the message to the invalid message queue (same as the requestor).
- If the message meets the requirements: Here is where the replier implements its part of the Return Address pattern. Remember that the requestor set the request message's reply-to property to specify the reply queue. The replier now gets that property's value and uses it to create a MessageProducer on the proper queue. The important part here is that the replier is not hard-coded to use a particular reply queue; it uses whatever reply queue each particular request message specifies.
- The replier then creates the reply message. In doing so, it implements the Correlation Identifier pattern by setting the relpy message's correlation-id property to the same value as the request message's message-id property.
- The replier then sends out the reply message and displays its details.
Thus a replier does everything necessary to receive a message (presumably a request) and send a reply.
Invalid Message Example
While we're at it, let's look at an example of the Invalid Message Channel pattern. Remember, one of the queues we need is one named "jms/InvalidMessages." This exists so that if a JMS client (a Message Endpoint) receives a message it cannot process, it can move the strange message to a special channel.
To demonstrate invalid message handling, we have designed an InvalidMessenger class. This object is specifically designed to send a message on the request channel whose format is incorrect. Like any channel, the request channel is a Datatype Channel, in that the request receivers expect the requests to be of a certain format. The invalid messenger simply sends a message of a different format; when the replier receives the message, it does not recognize the message's format, and so moves the message to the invalid message queue.
We'll run the Replier in one window and the Invalid Messenger in another window. When the invalid messenger sends its message, it displays output like this:
Sent invalid message Type: com.sun.jms.ObjectMessageImpl Time: 1048288516959 ms Message ID: ID:_XYZ123_1048288516639_7.2.1.1 Correl. ID: null Reply to: com.sun.jms.Queue: jms/ReplyQueue
This shows that the message is an instance of ObjectMessage (whereas the replier is expecting a TextMessage). The Replier recieves the invalid message and resends it to the invalid message queue:
Invalid message detected Type: com.sun.jms.ObjectMessageImpl Time: 1048288517049 ms Message ID: ID:_XYZ123_1048288516639_7.2.1.1 Correl. ID: null Reply to: com.sun.jms.Queue: jms/ReplyQueue Sent to invalid message queue Type: com.sun.jms.ObjectMessageImpl Time: 1048288517140 ms Message ID: ID:_XYZ123_1048287020267_6.2.1.2 Correl. ID: ID:_XYZ123_1048288516639_7.2.1.1 Reply to: com.sun.jms.Queue: jms/ReplyQueue
One insight worth noting is that when the message is moved to the invalid message queue, it is actually being resent, so it gets a new message ID. Because of this, we apply the Correlation Identifier pattern; once the replier determines the message to be invalid, it copies the message's main ID to its correlation ID so as to preserve a record of the message's original ID.
The code that handles this invalid-message processing is in the Replier class shown earlier, in the onMessage method. Requestor.receiveSync() contains similar invalid-message processing code.
Conclusions
We've seen how to implement two classes, Requestor and Replier (Message Endpoints), that exchange a request and reply Messages using Request-Reply. The request message uses a Return Address to specify what queue to send the reply on. The reply messages uses a Correlation Identifier to specify which request this is a reply for. The Requestor implements a Polling Consumer to receive replies, whereas the Replier implements an Event-Driven Consumer to receive requests. The request and reply queues are Datatype Channels; when a consumer receives a message that is not of the right type, it reroutes the message to the Invalid Message Channel.