Continuing with the order processing example, let's assume that company management publishes price changes and promotions to large customers. Whenever a price for an item changes, we send a message notifying the customer. We do the same if we are running a special promotion, e.g. all widgets are 10% off in the month of November. Some customers may be interested in receiving price updates or promotions only related to specific items. If I purchase primarily gadgets, I may not be interested in knowing whether widgets are on sale or not.
How can a component avoid receiving uninteresting messages?
Use a special kind of Message Router, a Message Filter, to eliminate undesired messages from a channel based on a set of criteria.
The Message Filter has only a single output channel. If the message content matches the criteria specified by the Message Filter, the message is routed to the output channel. If the message content does not match the criteria, the message is discarded.
... Read the entire pattern in the book Enterprise Integration Patterns
Example: RabbitMQ BindingsNEW
RabbitMQ allows a straightforward implementation of Publish-Subscribe Channel with Message Filters via a direct exchange. Exchanges are a concept which is distinct from a queue. Generally, message producers publish messages to an exchange, which controls how the message is propagated to other queues. For example, a "fanout" exchange simply propagates a message to all subscribed queues while a "direct" exchange allows subscribing queues to express a filter condition for the messages they want to receive.
A direct exchange places incoming messages into a subscribing queue based on a message's routing key and the queue's binding key. Multiple binding keys are allowed per queue and multiple queues can use the same binding key, which results in each of them receiving a copy of the queue.
The Widgets and Gadgets example from above can be implemented quite easily with a single direct exchange and two queues bound to this exchange, each with its respective filter. A helper method helps create a new queue, bind it to the exchange with the filter, and setup a Event-Driven Consumer that simply prints out messages.
static void filteredReceive(final Channel channel, String filter, String exchangeName) throws IOException { System.out.println(filter + " waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(filter + " Received '" + message + "'"); } }; String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, filter); boolean autoAck = true; channel.basicConsume(queueName, autoAck, consumer); }
The consumer is a simple Event-Driven Consumer that is called every time a message is received. The following lines generate a new private queue and bind it to the exchange with the specified filter string. Lastly, the consumer implementation is attached to the queue.
We can now create the direct exchange and start multiple of these consumers, giving each a respective binding key:
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); filteredReceive(channel, "Widget", EXCHANGE_NAME); filteredReceive(channel, "Gadget", EXCHANGE_NAME);
Lastly, we publish some messages against the exchange with different message keys:
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); sendMessage(channel, EXCHANGE_NAME, "Widget", "Hello EIP Widget!"); sendMessage(channel, EXCHANGE_NAME, "Gadget", "Hello EIP Gadget!");
sendMessage is a simple helper method:
static void sendMessage(Channel channel, String exchange, String key, String message) throws IOException { channel.basicPublish(exchange, key, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }
You can run this code and observe that RabbitMQ filters the messages according to their key:
Widget waiting for messages. To exit press CTRL+C Gadget waiting for messages. To exit press CTRL+C Widget Received 'Hello EIP Widget!' Gadget Received 'Hello EIP Gadget!'
The complete source code for this example can be found on Github.
Related patterns:
Content-Based Router, Event-Driven Consumer, Message Router, Selective Consumer, Publish-Subscribe Channel, Recipient List