When sending messages from one system to another it is common for the target system to require more information than the source system can provide. For example, incoming Address messages may just contain the ZIP code because the designers felt that storing a redundant state code would be superfluous. Likely, another system is going to want to specify both a state code and a ZIP code field. Yet another system may not actually use state codes, but spell the state name out because it uses free-form addresses in order to support international addresses. Likewise, one system may provide us with a customer ID, but the receiving system actually requires the customer name and address. An order message sent by the order management system may just contain an order number, but we need to find the customer ID associated with that order, so we can pass it to the customer management system. The scenarios are plentiful.
How do we communicate with another system if the message originator does not have all the required data items available?
Use a specialized transformer, a Content Enricher, to access an external data source in order to augment a message with missing information.
The Content Enricher uses information inside the incoming message (e.g. key fields) to retrieve data from an external source. After the Content Enricher retrieves the required data from the resource, it appends the data to the message. The original information from the incoming message may be carried over into the resulting message or may no longer be needed, depending on the specific needs of the receiving application.
The additional information injected by the Content Enricher has to be available somewhere in the system. The most common sources for the new data are:
- Computation The Content Enricher may be able to compute the missing information. In this case, the algorithm incorporates the additional information. For example, if the receiving system requires a state code but the incoming message only contains a ZIP code. Or, a receiving system may require a data format that specifies the total size of the message. The Content Enricher can add the length of all message fields and thus compute the message size. This form of Content Enricher is very similar to the basic Message Translator because it needs no external data source.
- Environment The Content Enricher may be able to retrieve the additional data from the operating environment. The most common example is a time stamp. For example, the receiving system may require each message to carry a time stamp. if the sending system does not include this field, the Content Enricher can get the current time from the operating system and add it to the message.
- Another System This option is the most common one. The Content Enricher has to retrieve the missing data from another system. This data resource can take on a number of forms, including a database, a file, an LDAP directory, a system, or a user who manually enters missing data.
... Read the entire pattern in the book Enterprise Integration Patterns
Example: Content Enricher with Amazon EventBridge PipesNEW
For cloud-based messaging systems, Amazon EventBridge Pipes can implement a Content Enricher through the "Enrichment" step:
The step itself is a slight misnomer as it can implement any Message Translator or even a Message Filter for batched sources. The pipe simply passes on the message(s) that are returned by the Enrichment step. To build an actual Content Enricher, you can use a Lambda function that fetches additional data from a DynamoDB table based on a key from the original message. You can find the source code for this example implementation in the EIP Code Repository.
You start out by creating two Message Channels in form of Amazon SQS Queues (the example uses the AWS CLI command line to keep things simple; for real projects an automation script is highly recommended):
$ aws sqs create-queue --queue-name EntityIDs --attributes='{"MessageRetentionPeriod": "300"}' $ aws sqs create-queue --queue-name EntityUpdates --attributes='{"MessageRetentionPeriod": "300"}' $ aws sqs list-queues
The last command prints out the URLs of the created queues, which you will need later.
The Content Enricher, implemented in EventBridge Pipes, Lambda, and DynamoDB, will fetch messages containing just id fields from EntityID queue and publish enriched events to the EntityUpdates queue. To build the Content Enricher, you first create a table to hold the Entity data and load it up with sample data, which is most easily done with a JSON file (available in the EIP Code Repository):
$ aws dynamodb create-table --table-name Entities \ --attribute-definitions AttributeName=id,AttributeType=S \ --key-schema AttributeName=id,KeyType=HASH \ --billing-mode PAY_PER_REQUEST $ aws dynamodb batch-write-item --request-items file://entities.json
To check that the table is loaded (and to get accustomed to DynamoDB's syntax), you can fetch an entity:
$ aws dynamodb get-item --table-name Entities --key '{"id": {"S": "123"}}' { "Item": { "id": { "S": "123" }, "Amount": { "N": "12.34" }, "Type": { "S": "Order" }, "Customer": { "S": "Pete's Pizza" } } }
Before you create the Pipe, you need to create a security role with the required permissions (the trust policy file is supplied in the EIP repository) so that your pipe can read and write messages from SQS and invoke a Lambda function. It's also handy to get your account ID and store it in a variable (if the command line doesn't support evaluations, you can execute the command by hand and set the variable explicitly or just replace $id with the number on each command line):
$ id=`aws sts get-caller-identity --query 'Account' --output text` $ echo $id 1234567890 $ aws iam create-role --role-name EventBridge_Pipes_Enricher --assume-role-policy-document file://trust_policy_pipes.json $ aws iam create-policy --policy-name pipes_sqs_lambda_sqs --policy-document file://pipes_role_policy.json \ --query Policy.Arn --output text $ aws iam attach-role-policy --role-name EventBridge_Pipes_Enricher --policy-arn arn:aws:iam::$id:policy/pipes_sqs_lambda_sqs
Additionally, you need to grant write permission to the EntityUpdates queue with a so-called resource policy that's attached to a resource instead of a role:
$ aws sqs add-permission --queue-url= https://sqs.ap-southeast-1.amazonaws.com/$id/EntityUpdates \ --aws-account-ids=$id \ --actions SendMessage ReceiveMessage DeleteMessage ChangeMessageVisibility \ --label SendEntities
You can now create a Pipe and give it the required permissions through the role you just created (notice the use of $id for the account ID):
$ aws pipes create-pipe --name ContentEnricher \ --source arn:aws:sqs:ap-southeast-1:$id:EntityIDs \ --target arn:aws:sqs:ap-southeast-1:$id:EntityUpdates \ --enrichment arn:aws:lambda:ap-southeast-1:$id:function:EnrichFromDynamoDB \ --role-arn arn:aws:iam::$id:role/EventBridge_Pipes_Enricher
You also need to deploy the Lambda function that looks up the entities from the database and returns the enriched messages. EventBridge pipes can work in batches, meaning it can fetch more than one event from the event source at one time. Correspondingly, your Lambda function has to be prepared receive a list of IDs. In my implementation, it uses the batch_get_item method to look up all IDs at once (invalid IDs simply don't return an item):
import json import boto3 dynamo = boto3.resource('dynamodb') table_name = 'Entities' def lambda_handler(event, context): keys = [] for evt in event: body = json.loads(evt['body']) keys += [ {'id': body['id']} ] data = dynamo.batch_get_item(RequestItems={table_name: {'Keys': keys} }) return data['Responses'][table_name]
As you might have guessed, the Lambda function also needs a role that allows it to read from DynamoDB and write logs:
$ aws iam create-role --role-name Lambda_Enricher --assume-role-policy-document file://trust_policy_lambda.json $ aws iam attach-role-policy --role-name Lambda_Enricher --policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess $ aws iam attach-role-policy --role-name Lambda_Enricher --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
To deploy the function directly, you need to first zip it up (using any Zip utility) to code.zip and deploy it with the following command line (again, notice the $id being referenced):
$ aws lambda create-function --function-name EnrichFromDynamoDB \ --role arn:aws:iam::$id:role/Lambda_Enricher \ --runtime python3.9 --zip-file fileb://code.zip --handler DynamoDBEnricher.lambda_handler
Show time! You can now send a message to the inbound queue for the Content Enricher and poll the result queue for enriched messages (the query and output parameters just filter down the output to the message body; feel free to omit them for the full experience):
$ aws sqs send-message --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityIDs --message-body '{"id": "123"}' $ aws sqs send-message --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityIDs --message-body '{"id": "456"}' $ aws sqs receive-message --max-number-of-messages 5 \ --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityUpdates \ --query 'Messages[*].Body' --output text {"id":"123","Amount":"12.34","Type":"Order","Customer":"Pete's Pizza"} {"id":"456","Amount":"22.22","Type":"Order","Customer":"Mary's Muffins"}
Related patterns:
Content Filter, Event Message, Message Filter, Message Channel, Message Translator, Claim Check