Enterprise Integration Patterns
Messaging Patterns
HOME PATTERNS RAMBLINGS ARTICLES TALKS DOWNLOAD BOOKS CONTACT
Messaging Patterns
Publish-Subscribe ChannelPublish-Subscribe ChannelMessaging Patterns » Messaging Channels

An application is using Messaging to announce events.

How can the sender broadcast an event to all interested receivers?

Send the event on a Publish-Subscribe Channel, which delivers a copy of a particular event to each receiver.

A Publish-Subscribe Channel works like this: It has one input channel that splits into multiple output channels, one for each subscriber. When an event is published into the channel, the Publish-Subscribe Channel delivers a copy of the message to each of the output channels. Each output channel has only one subscriber, which is only allowed to consume a message once. In this way, each subscriber only gets the message once and consumed copies disappear from their channels.

A Publish-Subscribe Channel can be a useful debugging tool. Even though a message is destined to only a single receiver, using a Publish-Subscribe Channel allows you to eavesdrop on a message channel without disturbing the existing message flow. Monitoring all traffic on a channel can be tremendously helpful when debugging messaging applications. It can also save you from inserting a ton of print statements into each application that participates in the messaging solution. Creating a program that listens for messages on all active channels and logs them to a file can realize many of the same benefits that a Message Store brings.

... Read the entire pattern in the book Enterprise Integration Patterns

Example: Google Cloud Pub/SubNEW

Google Cloud Pub/Sub offers both Competing Consumers and Publish-Subscribe Channel semantics, managed through topics (Publish-Subcribe) and subscriptions (Competing Consumers) as illustrated in this diagram:


Google Cloud Pub-Sub Concept Overview

In this example, both Subscriber Y and Subscriber Z each receive a copy of Message 3 as they are subscribing to the same Topic C, but through separate subscriptions. Google Cloud Pub/Sub does not support wildcard subscriptions.

Google provides client API libraries that make coding against the cloud service relatively simple. The client code has to authenticate first and create topics before messages can be published. Subscriptions are created on the fly.

A stand-alone application can authenticate using private keys:

Pubsub pubsub;
    
void createClient(String private_key_file, String email) throws IOException, GeneralSecurityException {
  HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
  GoogleCredential credential = new GoogleCredential.Builder()
      .setTransport(transport)
      .setJsonFactory(JSON_FACTORY)
      .setServiceAccountScopes(PubsubScopes.all())
      .setServiceAccountId(email)
      .setServiceAccountPrivateKeyFromP12File(new File(private_key_file))
      .build();
    pubsub = new Pubsub.Builder(transport, JSON_FACTORY, credential)
      .setApplicationName("eaipubsub")
      .build();
}

using the PubSub instance, you can create a new topic if it doesn't already exist:

Topic createTopic(String topicName) throws IOException {
  String topic = getTopic(topicName); // adds project name and resource type
  Pubsub.Projects.Topics topics = pubsub.projects().topics();
  ListTopicsResponse list = topics.list(project).execute();
  if (list.getTopics() == null || !list.getTopics().contains(new Topic().setName(topic))) {
      return topics.create(topic, new Topic()).execute();
  } else {
      return new Topic().setName(topic);
  }
}

Now the code is ready to publish a message to the topic, returning the published message IDs:

List<String> publishMessage(String topicName, String data) throws IOException {
  List<PubsubMessage> messages = Lists.newArrayList();
  messages.add(new PubsubMessage().encodeData(data.getBytes("UTF-8")));
  PublishRequest publishRequest = new PublishRequest().setMessages(messages);
  PublishResponse publishResponse = pubsub.projects().topics()
      .publish(getTopic(topicName), publishRequest)
      .execute();
  return publishResponse.getMessageIds();
}

Subscriptions are identified by a name and a topic so multiple subscribers can compete for messages off the same subscription or create individual subscriptions for the same topic. If a subscription already exists, the underlying REST API throws an exception, which we can simply ignore.

Subscription subscribeTopic(String subscriptionName, String topicName) throws IOException {
  String sub = getSubscription(subscriptionName);  // adds project name and resource type
  Subscription subscription = new Subscription()
    .setName(sub)
    .setAckDeadlineSeconds(15)
    .setTopic(getTopic(topicName));
  try {
    return pubsub.projects().subscriptions().create(sub, subscription).execute();
  } catch (GoogleJsonResponseException e) {
    if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
        return subscription;
    } else {
        throw e;
    }
  }
}

Now everything is in place to pull messages off the subscription. The code is a bit verbose because it has to navigate collections that allow receiving multiple messages at once. It also has to convert the payload back into a String for this simple demo implementation:

List<String> pullMessage(Subscription subscription, int maxMessages, boolean doAck) throws IOException {
  PullRequest pullRequest = new PullRequest()
          .setReturnImmediately(true)
          .setMaxMessages(maxMessages);
  PullResponse response = pubsub.projects().subscriptions().pull(subscription.getName(), pullRequest).execute();
  List<ReceivedMessage> messages = response.getReceivedMessages();
  List<String> ackIds = Lists.newArrayList();
  List<String> data = Lists.newArrayList();
  if (messages != null) {
      for (ReceivedMessage receivedMessage : messages) {
          PubsubMessage message = receivedMessage.getMessage();
          if (message != null) {
              byte[] bytes = message.decodeData();
              if (bytes != null) {
                  data.add(new String(bytes, "UTF-8"));
              }
          }
          ackIds.add(receivedMessage.getAckId());
      }
      if (doAck) {
          AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
          pubsub.projects().subscriptions().acknowledge(subscription.getName(), ackRequest).execute();
      }
  }
  return data;
}

Find the source for this code snippet on Github.

Related patterns:

Competing Consumers, Durable Subscriber, Event Message, Message, Message Channel, Message Store, Messaging, JMS Publish/Subscribe Example, Point-to-Point Channel, Request-Reply

Further reading:


Creative Commons Attribution License

You can reuse the following elements under the Creative Commons Attribution license: pattern icon, pattern name, problem and solution statements (in bold), and the sketch. Other portions are protected by copyright.

Enterprise Integration Patterns book cover

Enterprise Integration Patterns
The de-facto language for designing asynchronous, distributed systems. Over 100,000 copies sold.

Software Architect Elevator book cover

The Software Architect Elevator
Rethink the role of architects as a connecting element across organizational layers. Acquire the technical, communication, and organizational skills to succeed in this new role.

Cloud Strategy book cover

Cloud Strategy
Make your cloud migration a success by translating high-level goals into conscious decisions with well-understood trade-offs.

Platform Strategy book cover

Platform Strategy
Platforms can boost innovation through harmonization, but they aren't easy to build. Learn from over a decade of designing and rolling out IT platforms.