An application is using Messaging to announce events.
How can the sender broadcast an event to all interested receivers?
Send the event on a Publish-Subscribe Channel, which delivers a copy of a particular event to each receiver.
A Publish-Subscribe Channel works like this: It has one input channel that splits into multiple output channels, one for each subscriber. When an event is published into the channel, the Publish-Subscribe Channel delivers a copy of the message to each of the output channels. Each output channel has only one subscriber, which is only allowed to consume a message once. In this way, each subscriber only gets the message once and consumed copies disappear from their channels.
A Publish-Subscribe Channel can be a useful debugging tool. Even though a message is destined to only a single receiver, using a Publish-Subscribe Channel allows you to eavesdrop on a message channel without disturbing the existing message flow. Monitoring all traffic on a channel can be tremendously helpful when debugging messaging applications. It can also save you from inserting a ton of print statements into each application that participates in the messaging solution. Creating a program that listens for messages on all active channels and logs them to a file can realize many of the same benefits that a Message Store brings.
... Read the entire pattern in the book Enterprise Integration Patterns
Example: Google Cloud Pub/SubNEW
Google Cloud Pub/Sub offers both Competing Consumers and Publish-Subscribe Channel semantics, managed through topics (Publish-Subcribe) and subscriptions (Competing Consumers) as illustrated in this diagram:
In this example, both Subscriber Y and Subscriber Z each receive a copy of Message 3 as they are subscribing to the same Topic C, but through separate subscriptions. Google Cloud Pub/Sub does not support wildcard subscriptions.
Google provides client API libraries that make coding against the cloud service relatively simple. The client code has to authenticate first and create topics before messages can be published. Subscriptions are created on the fly.
A stand-alone application can authenticate using private keys:
Pubsub pubsub; void createClient(String private_key_file, String email) throws IOException, GeneralSecurityException { HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport(); GoogleCredential credential = new GoogleCredential.Builder() .setTransport(transport) .setJsonFactory(JSON_FACTORY) .setServiceAccountScopes(PubsubScopes.all()) .setServiceAccountId(email) .setServiceAccountPrivateKeyFromP12File(new File(private_key_file)) .build(); pubsub = new Pubsub.Builder(transport, JSON_FACTORY, credential) .setApplicationName("eaipubsub") .build(); }
using the PubSub instance, you can create a new topic if it doesn't already exist:
Topic createTopic(String topicName) throws IOException { String topic = getTopic(topicName); // adds project name and resource type Pubsub.Projects.Topics topics = pubsub.projects().topics(); ListTopicsResponse list = topics.list(project).execute(); if (list.getTopics() == null || !list.getTopics().contains(new Topic().setName(topic))) { return topics.create(topic, new Topic()).execute(); } else { return new Topic().setName(topic); } }
Now the code is ready to publish a message to the topic, returning the published message IDs:
List<String> publishMessage(String topicName, String data) throws IOException { List<PubsubMessage> messages = Lists.newArrayList(); messages.add(new PubsubMessage().encodeData(data.getBytes("UTF-8"))); PublishRequest publishRequest = new PublishRequest().setMessages(messages); PublishResponse publishResponse = pubsub.projects().topics() .publish(getTopic(topicName), publishRequest) .execute(); return publishResponse.getMessageIds(); }
Subscriptions are identified by a name and a topic so multiple subscribers can compete for messages off the same subscription or create individual subscriptions for the same topic. If a subscription already exists, the underlying REST API throws an exception, which we can simply ignore.
Subscription subscribeTopic(String subscriptionName, String topicName) throws IOException { String sub = getSubscription(subscriptionName); // adds project name and resource type Subscription subscription = new Subscription() .setName(sub) .setAckDeadlineSeconds(15) .setTopic(getTopic(topicName)); try { return pubsub.projects().subscriptions().create(sub, subscription).execute(); } catch (GoogleJsonResponseException e) { if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { return subscription; } else { throw e; } } }
Now everything is in place to pull messages off the subscription. The code is a bit verbose because it has to navigate collections that allow receiving multiple messages at once. It also has to convert the payload back into a String for this simple demo implementation:
List<String> pullMessage(Subscription subscription, int maxMessages, boolean doAck) throws IOException { PullRequest pullRequest = new PullRequest() .setReturnImmediately(true) .setMaxMessages(maxMessages); PullResponse response = pubsub.projects().subscriptions().pull(subscription.getName(), pullRequest).execute(); List<ReceivedMessage> messages = response.getReceivedMessages(); List<String> ackIds = Lists.newArrayList(); List<String> data = Lists.newArrayList(); if (messages != null) { for (ReceivedMessage receivedMessage : messages) { PubsubMessage message = receivedMessage.getMessage(); if (message != null) { byte[] bytes = message.decodeData(); if (bytes != null) { data.add(new String(bytes, "UTF-8")); } } ackIds.add(receivedMessage.getAckId()); } if (doAck) { AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds); pubsub.projects().subscriptions().acknowledge(subscription.getName(), ackRequest).execute(); } } return data; }
Find the source for this code snippet on Github.
Related patterns:
Competing Consumers, Durable Subscriber, Event Message, Message, Message Channel, Message Store, Messaging, JMS Publish/Subscribe Example, Point-to-Point Channel, Request-Reply
Further reading: