This section uses a more elaborate example to demonstrate how the system management patterns introduced in this section can be used to monitor and control a messaging solution. The example builds upon the C# and MSMQ implementation of the Loan Broker example from Chapter ?? (see Asynchronous Implementation with MSMQ). We augment rather than modify the original example solution so it is not critical that you reviewed all the code of the original example. As with the original example implementation, the intent of this section is not to explain the use of MSMQ-specific API's but rather to illustrate the implementation of the patterns in this book using a queue-oriented messaging system. The structure of the solution would look very similar when implemented in Java using JMS queues or IBM WebSphere MQ. Because we focus mostly on the design decisions and trade-offs, this section should be valuable to you even if you are not a C# or MSMQ developer.
Instrumenting the Loan Broker
The loan broker implementation consists of the following four key components (see picture):
- The Test Client makes requests for loan quotes.
- The Loan Broker acts as the central process manager and coordinates the communication between the credit bureau and the banks.
- The Credit Bureau provides a service to the Loan Broker, computing customer's credit scores.
- Each Bank receives a quote request from the Loan Broker and submits an interest rate quote according to the loan parameters.
In most integration scenarios we do not have access to the application internals but are limited to monitoring and managing components from the outside. To make this example as realistic as possible, we decide to treat each of the components as a black box. Keeping this constraint in mind, we want the management solution to meet the following requirements:
- Management Console: We want a single front-end that displays the health of all components and allows us to take compensating actions if something goes wrong.
- Loan Broker Quality of Service: In the original solution we developed a test client that monitors the loan broker's response times between quote request and response. In a real production scenario clients will not perform this function for us (they may, however, complain that the system is too slow). Therefore, we want the management solution to capture this information and relay it to the management console.
- Verify the Credit Bureau Operation: The Credit Bureau is an external service provided by a third party. We want to ensure the correct operation of this service by periodically sending test messages.
- Credit Bureau Failover: If the Credit Bureau malfunctions we want to temporarily redirect the credit request messages to another service provider.
Management Console
We want to be able to collect metrics from multiple components in a single point so we can assess the health of the overall solution from a central spot. The management console also has to be able to control message flow and component parameters so that we can address outages by rerouting messages or changing component behavior.
The management console communicates with the individual components via messaging. It uses a separate Control Bus that only contains messages related to system management and not application data.
Because this is a book on enterprise integration and not on user interface design, we keep the management console very, very simple. Many vendors offer real-time data displays or you can even achieve visual miracles with Visual Basic and Microsoft Office components such as Excel. Also, many operating system or programming platforms offer their own instrumentation frameworks, e.g. Java/JMX (Java Management Extensions) or Microsoft's WMI (Windows Management Instrumentation). We decide to hand-roll our solution to make it less dependent on a specific vendor's API and to demonstrate the inner workings of a monitoring solution.
Loan Broker Quality of Service
The first requirement for the management solution is to measure the quality of service that the loan broker provides to its clients. For this type of monitoring we are not interested in the business content of the individual messages, i.e. the interest rate offered to the client, but only in the time elapsed between the request message and the reply message. The tricky part in tracking the time between these two messages is the fact that the client can specify the channel for the reply message via a Return Address. So we cannot listen on a fixed channel for the reply. Luckily, the Smart Proxy pattern solves this dilemma for us. A Smart Proxy intercepts request messages, stores the Return Address supplied by the client and replaces it with a fixed channel address. As a result, the service (the Loan Broker in our case) sends all reply messages to one channel. The Smart Proxy listens to this channel and correlates incoming reply messages to stored request messages. It then forwards the reply message to the original Return Address specified by the client (see picture).
In order to take advantage of the Smart Proxy functionality, we "insert" the Smart Proxy between the client and the Loan Broker (see picture). This insertion is transparent to the client because the Smart Proxy listens on the same channel that the Loan Broker originally listened on (loanrequestQueue). We now start the Loan Broker with new parameters so that it listens on the brokerRequestQueue channel instead of the loanrequestQueue. The Smart Proxy instructs the Loan Broker to send all reply messages to the brokerReplyQueue channel from where it forwards the messages back to the correct Return Address originally specified by the client.
We want to use the Smart Proxy to measure both the response time for loan requests and the number of requests being processed by the loan broker at any one time. The Smart Proxy can measure the time elapsed between request and reply messages by capturing the time that the request message was received. When it receives the associated reply message, the Smart Proxy subtracts the request time from current time to compute the time elapsed between request and reply. The Smart Proxy can estimate how many active requests the Loan Broker is managing at one time by counting how many outstanding request messages there are (i.e. request messages that have not yet received reply messages). The Smart Proxy cannot distinguish between messages queued up on the brokerRequestQueue from messages that the Loan Broker started processing, so this metric equals the sum of both. We can update the number of outstanding request messages whenever we receive a request message or a reply message.
The Smart Proxy passes the metrics information to the management console for monitoring and analysis. We could send the statistics for every single message but that would clutter our network if we deal with high message volumes. Inserting a Smart Proxy into the message flow already doubles the number of message sent (two request and reply messages instead of one each) so we want to avoid sending another control message for each request message. Instead we use a timer so that the Smart Proxy sends a metrics message to the Control Bus in predefined intervals, e.g. every 5 seconds. The metrics message can either contain summary metrics (e.g. the maximum, minimum and average response time) or the detailed info for all messages that passed through during the interval. In order to keep the metrics messages small and the management console simple we decide to just pass the summary metrics to the console.
For the implementation of the Loan Broker smart proxy, we reuse the SmartProxy base classes introduced in the Smart Proxy pattern. We subclass the SmartProxyBase, SmartProxyRequestConsumer, and SmartProxyReplyConsumer classes (see class diagram). Please see the Smart Proxy pattern for the source code for these classes.
Just like the original SmartProxy, the new LoanBrokerProxy contains two separate message consumers, one for incoming request message from the client (LoanBrokerProxyRequestConsumer) and one for incoming reply messages from the Loan Broker (LoanBrokerProxyReplyConsumer). Both consumer classes inherit from their respective base classes and add a new implementation of the AnalyzeMessage method.
Let's have a look at the implementation of the LoanBrokerProxy class. The class maintains two ArrayLists with metrics, performanceStats and queueStats ArrayLists. performanceStats collects data points of response-reply time intervals in seconds while queueStats collects data points of the number of outstanding request messages (those either queued up in the brokerRequestQueue or in process by the Loan Broker). When the pre-programmed timer triggers, the method OnTimerEvent moves the data from both collections and stores them temporary snapshot. We have to perform any further analysis of the data with this snapshot copy because the message consumers continue to add new data points as messages are received.
LoanBrokerProxy Class
public class LoanBrokerProxy : SmartProxyBase { protected MessageQueue controlBus; protected ArrayList performanceStats; protected ArrayList queueStats; protected int interval; protected Timer timer; public LoanBrokerProxy(MessageQueue inputQueue, MessageQueue serviceRequestQueue, MessageQueue serviceReplyQueue, MessageQueue controlBus, int interval) : base (inputQueue, serviceRequestQueue, serviceReplyQueue) { messageData = Hashtable.Synchronized(new Hashtable()); queueStats = ArrayList.Synchronized(new ArrayList()); performanceStats = ArrayList.Synchronized(new ArrayList()); this.controlBus = controlBus; this.interval = interval; requestConsumer = new LoanBrokerProxyRequestConsumer(inputQueue, serviceRequestQueue, serviceReplyQueue, messageData, queueStats); replyConsumer = new LoanBrokerProxyReplyConsumer(serviceReplyQueue, messageData, queueStats, performanceStats); } public override void Process() { base.Process(); TimerCallback timerDelegate = new TimerCallback(OnTimerEvent); timer = new Timer(timerDelegate, null, interval*1000, interval*1000); } protected void OnTimerEvent(Object state) { ArrayList currentQueueStats; ArrayList currentPerformanceStats; lock (queueStats) { currentQueueStats = (ArrayList)(queueStats.Clone()); queueStats.Clear(); } lock (performanceStats) { currentPerformanceStats = (ArrayList)(performanceStats.Clone()); performanceStats.Clear(); } SummaryStats summary = new SummaryStats(currentQueueStats, currentPerformanceStats); if (controlBus != null) controlBus.Send(summary); } }
The LoanBrokerProxy uses the SummaryStats structure to condense the individual data points into maximum, minimum and average values and then sends the summary data to the Control Bus. We could make the evaluation more efficient by updating the summary statistics with each incoming message so that we have to store only the summary data and not each datapoint. On the other hand, deferring the computation allows us to change the amount of detail we want to publish to the Control Bus.
The LoanBrokerProxyRequestConsumer class is rather simple. The base class SmartProxyRequestConsumer takes care of storing relevant message data in the messageData Hashtable so that we can derive the current number of outstanding request messages from the size of the Hashtable. The LoanBrokerProxyRequestConsumer maintains a reference to the queueStats collection inside the LoanBrokerProxy so that it can add the new data point to this collection.
LoanBrokerProxyRequestConsumer Class
public class LoanBrokerProxyRequestConsumer : SmartProxyRequestConsumer { ArrayList queueStats; public LoanBrokerProxyRequestConsumer(MessageQueue requestQueue, MessageQueue serviceRequestQueue, MessageQueue serviceReplyQueue, Hashtable messageData, ArrayList queueStats) : base(requestQueue, serviceRequestQueue, serviceReplyQueue, messageData) { this.queueStats = queueStats; } protected override void ProcessMessage(Message requestMsg) { base.ProcessMessage(requestMsg); queueStats.Add(messageData.Count); } }
The LoanBrokerProxyReplyConsumer collects two metrics when a reply message arrives. First, it computes the time it took between sending the request message and receiving the reply message and adds that metric to the performanceStats collection. Second, it captures the remaining number of outstanding requests and adds that number to the queueStats collection.
LoanBrokerProxyReplyConsumer Class
public class LoanBrokerProxyReplyConsumer : SmartProxyReplyConsumer { ArrayList queueStats; ArrayList performanceStats; public LoanBrokerProxyReplyConsumer(MessageQueue replyQueue, Hashtable messageData, ArrayList queueStats, ArrayList performanceStats) : base(replyQueue, messageData) { this.queueStats = queueStats; this.performanceStats = performanceStats; } protected override void AnalyzeMessage(MessageData data, Message replyMessage) { TimeSpan duration = DateTime.Now - data.SentTime; performanceStats.Add(duration.TotalSeconds); queueStats.Add(messageData.Count); } }
The SummaryStats structure computes maxima, minima and averages based on the captured data. It can derive the number of request messages processed by subtracting the number of performance data points (collected only for reply messages) from the number of queue data points (collected for request and reply messages). The implementation of this structure is quite trivial so that we decided not to fill a whole page with the code.
Once we insert the new Loan Broker proxy into the message stream we can start collecting performance metrics. To collect some example data, we configured two test clients to make 50 loan quote requests each. The proxy collected the following results (we used a simple XSL transform to render an HTML table off the metric data published in XML format to the controlBusQueue):
Date | #Requests | #Replies | min Processing Time | avg Processing Time | max Processing Time | min Queue Size | avg Queue Size | max Queue Size |
14:11:02.9644 | 0 | 0 | 0.00 | 0.00 | 0.00 | 0 | 0 | 0 |
14:11:07.9718 | 89 | 7 | 0.78 | 2.54 | 3.93 | 1 | 42 | 82 |
14:11:12.9792 | 11 | 9 | 4.31 | 6.43 | 8.69 | 83 | 87 | 91 |
14:11:17.9866 | 0 | 8 | 9.39 | 10.83 | 12.82 | 77 | 80 | 84 |
14:11:22.9940 | 0 | 8 | 13.80 | 15.75 | 17.48 | 69 | 72 | 76 |
14:11:28.0014 | 0 | 7 | 18.37 | 20.19 | 22.18 | 62 | 65 | 68 |
14:11:33.0088 | 0 | 6 | 22.90 | 24.83 | 26.94 | 56 | 58 | 61 |
14:11:38.0162 | 0 | 10 | 27.74 | 29.53 | 31.62 | 46 | 50 | 55 |
14:11:43.0236 | 0 | 9 | 31.87 | 34.47 | 36.30 | 37 | 41 | 45 |
14:11:48.0310 | 0 | 7 | 36.87 | 39.06 | 40.98 | 30 | 33 | 36 |
14:11:53.0384 | 0 | 9 | 41.75 | 43.82 | 45.14 | 21 | 25 | 29 |
14:11:58.0458 | 0 | 8 | 45.92 | 47.67 | 49.67 | 13 | 16 | 20 |
14:12:03.0532 | 0 | 8 | 50.86 | 52.58 | 54.59 | 5 | 8 | 12 |
14:12:08.0606 | 0 | 4 | 55.41 | 55.96 | 56.69 | 1 | 2 | 4 |
14:12:13.0680 | 0 | 0 | 0.00 | 0.00 | 0.00 | 0 | 0 | 0 |
Loaded into an Excel chart, the queue size data looks like this:
We can see that he two test clients pretty much flooded the Loan Broker, peaking at about 90 pending requests. The Loan Broker then processes the requests at a stable rate of about 2 requests per second. Due to the large number of queued up requests, the the response times are pretty poor, peaking at almost 1 minute. The good news is that the Loan Broker handles a large number of sudden requests gracefully, while the bad news is that the response times are very long. In order to improve response times we could decide to execute multiple Loan Broker instances or multiple Credit Bureau instances (the Credit Bureau service turned out to be a bottleneck before).
Verify the Credit Bureau Operation
The second requirement for the management solution is to monitor the correct operation of the external Credit Bureau service. The Loan Broker accesses this service to obtain credit scores for customers requesting a loan quote. The banks require this information to provide an accurate quote.
In order to verify the correct operation of the external Credit Bureau service we send periodic Test Messages to the service. Because the Credit Bureau service supports a Return Address it is easy to inject a Test Message without disturbing the existing message flow. We simply provide a dedicated reply channel for test messages and avoid the need for a separate test message separator.
In order to verify the correct operation of the Credit Bureau service we need a Test Data Generator and a Test Data Verifier. The test data generator creates test data to be sent to the service under test. A Credit Bureau test message is very simple; the only field that is required is a social security number (SSN). For our tests we use a special, fixed SSN that identifies a fictitious person. This allows us to verify the result data with preestablished results. This way we can not only check whether we receive a reply message but also verify the content of the message. In our simple example, the Credit Bureau is programmed to return random results regardless of the incoming SSN. As a result, our test data verifier does not check for specific result values but instead verifies whether the results are within the allowed range (e.g. 300 - 900 for a credit score). If the results fall outside the allowed range (for example because a computational error score set the score to zero), the test data verifier notifies the management console with a message.
The test data verifier also checks the response time of the external service. If we do not receive a reply message within a preset time interval we alert the management console. To minimize network bandwidth, the test data verifier notifies the console only if the response is delayed or malformed, not when the service is operating correctly. The only exception to this rule occurs when the monitor receives a correct reply message from the service subsequent to detecting an error. In that case, the monitor sends a "service OK" message to the management console to indicate that the credit bureau is working correctly again. Lastly, during start-up the monitor sends a message to the console to announce its existence. This message allows the console to "discover" all active monitors so it can display the status for each.
The monitor implementation uses two separate timers. The Send Timer determines the time interval between the last received message or the last timeout event and sending the next Test Message. The Timeout Timer is started whenever the monitor sends a request message. If a reply message arrives within the specified timeout interval, the Timeout Timer is reset and restarted with the next request message. If the monitor does not receive a reply message within the specified interval, the Timeout timer triggers and sends the monitor sends an error message to the control bus. It then starts a new Send Timer to initiate a new request message after the send interval (see diagram). A real-life scenario is likely to use a relatively short timeout (a few seconds) and a larger send interval (e.g., one minute).
The implementation of the monitor is relatively simple and only requires a single class. The Monitor class inherits from the MessageConsumer introduced in the Smart Proxy pattern. This class configures and inbound channel and starts an Event-Driven Consumer to receive messages. For each incoming message it invokes the virtual ProcessMessage method. An inheriting class can simply override this method to add its own processing.
The Process method instructs a MessageConsumer to start consuming messages. The Monitor class augments the base implementation of this method by starting the Send Timer. When this timer triggers, it invokes the OnSendTimerEvent method. The Process message also sends a message to the Control Busto announce its existence.
Monitor Class - Sending Messages
public override void Process() { base.Process(); sendTimer = new Timer(new TimerCallback(OnSendTimerEvent), null, interval*1000, Timeout.Infinite); MonitorStatus status = new MonitorStatus(MonitorStatus.STATUS_ANNOUNCE, "Monitor On-Line", null, MonitorID); Console.WriteLine(status.Description); controlQueue.Send(status); lastStatus = status.Status; } protected void OnSendTimerEvent(Object state) { CreditBureauRequest request = new CreditBureauRequest(); request.SSN = SSN; Message requestMessage = new Message(request); requestMessage.Priority = MessagePriority.AboveNormal; requestMessage.ResponseQueue = inputQueue; Console.WriteLine(DateTime.Now.ToString() + " Sending request message"); requestQueue.Send(requestMessage); correlationID = requestMessage.Id; timeoutTimer = new Timer(new TimerCallback(OnTimeoutEvent), null, timeout*1000, Timeout.Infinite); }
The OnSendTimerEvent method creates a new request message. The only parameter in the request message is the customer's social security number (SSN). The method specifies a fixed SSN. The method also saves the message ID to verify the Correlation Identifier of any incoming reply messages. Lastly, it starts the timeoutTimer so that the monitor is being notified after a set time interval if no reply message is received.
The method sets the test message's Priority property to AboveNormal to make sure that queued up application messages do not let the service appear as if it was not available. Using a higher priority for Test Messages causes the message queue to deliver these messages ahead of queued up application messages. Setting a higher message priority is safe in this case because test data generator injects a very small volume of Test Messages. If we injected a large volume of high priority messages into the request channel we could interrupt the flow of application messages. This would definitely violate the intention of a management solution to be as minimally intrusive as possible.
The ProcessMessage is the heart of the Monitor class. It implements the test message verifier, evaluating incoming reply messages. After stopping the timeout timer the method checks the incoming message for the correct Correlation Identifier, correct data type of the message body and reasonable values inside the message body. If any of these tests fail, the method sets up a MonitorStatus structure and sends it to the Control Bus channel. The monitor also tracks the previous status is in the lastStatus variable. If the status changes from "error" to "OK", the ProcessMessage method also sends a notification to the Control Bus.
Monitor Class - Receiving Messages
protected override void ProcessMessage(Message msg) { Console.WriteLine(DateTime.Now.ToString() + " Received reply message"); if (timeoutTimer != null) timeoutTimer.Dispose(); msg.Formatter = new XmlMessageFormatter(new Type[] {typeof(CreditBureauReply)}); CreditBureauReply replyStruct; MonitorStatus status = new MonitorStatus(); status.Status = MonitorStatus.STATUS_OK; status.Description = "No Error"; status.ID = MonitorID; try { if (msg.Body is CreditBureauReply) { replyStruct = (CreditBureauReply)msg.Body; if (msg.CorrelationId != correlationID) { status.Status = MonitorStatus.STATUS_FAILED_CORRELATION; status.Description = "Incoming message correlation ID does not match outgoing message ID"; } else { if (replyStruct.CreditScore < 300 || replyStruct.CreditScore > 900 || replyStruct.HistoryLength < 1 || replyStruct.HistoryLength > 24) { status.Status = MonitorStatus.STATUS_INVALID_DATA; status.Description = "Credit score values out of range"; } } } else { status.Status = MonitorStatus.STATUS_INVALID_FORMAT; status.Description = "Invalid message format"; } } catch (Exception e) { Console.WriteLine("Exception: {0}", e.ToString()); status.Status = MonitorStatus.STATUS_INVALID_FORMAT; status.Description = "Could not deserialize message body"; } StreamReader reader = new StreamReader (msg.BodyStream); status.MessageBody = reader.ReadToEnd(); Console.WriteLine(status.Description); if (status.Status != MonitorStatus.STATUS_OK || (status.Status == MonitorStatus.STATUS_OK && lastStatus != MonitorStatus.STATUS_OK)) { controlQueue.Send(status); } lastStatus = status.Status; sendTimer.Dispose(); sendTimer = new Timer(new TimerCallback(OnSendTimerEvent), null, interval*1000, Timeout.Infinite); }
If no message arrives in the specified interval, the timeoutTimer will invoke the OnTimeoutEvent method. This method sends a MonitorStatus message to the Control Bus and starts a new Send timer so that a new request message is sent after the interval.
Monitor Class - Timeout
protected void OnTimeoutEvent(Object state) { MonitorStatus status = new MonitorStatus(MonitorStatus.STATUS_TIMEOUT, "Timeout", null, MonitorID); Console.WriteLine(status.Description); controlQueue.Send(status); lastStatus = status.Status; timeoutTimer.Dispose(); sendTimer = new Timer(new TimerCallback(OnSendTimerEvent), null, interval*1000, Timeout.Infinite); }
Credit Bureau Failover
Now that we can monitor the status of the external Credit Bureau service we want to use this data to implement a failover scheme so that the Loan Broker can continue operating even when the Credit Bureau service fails. It is worthwhile noting that Point-to-Point Channels already provide a basic form of failover. When we use multiple Competing Consumers on a single Point-to-Point Channel, the failure of one consumer will not interrupt processing as long as the other consumer(s) still operate. When multiple consumers are active, they split the load, effectively implementing a simple load balancing mechanism. Why would we need to implement an explicit fail-over mechanism? When using external services, we may be limited to simple channels that do not support Competing Consumers, e.g. SOAP over HTTP. Also, we may not want multiple services to load balance. For example, we may have a volume agreement with the primary service provider that gives us substantial discounts if we meet certain usage quotas. Splitting the traffic across two providers will likely cost us more. Alternatively, we may be using a low-cost provider as out primary service provider and only want to switch over to a premium provider when the low-cost provider fails. (For an excellent discussion of architectural decisions driven by licensing issues see [Hohmann]) .
In order to implement explicit failover, we insert a Message Router into the credit bureau request channel (see picture). This router routes the request either to the primary credit bureau service (green arrows) or the secondary credit bureau service (red arrows). Because the secondary service may use a different message format than the first service, we wrap the secondary service with a pair of Message Translators. The Message Router is a Context-Based Router, controlled by the management console over the Control Bus. The management console gets monitoring data from the Credit Bureau Monitor we designed in the previous section. If the monitor indicates a failure, the management console instructs the Message Router to reroute the traffic to the secondary service provider (see picture).
Even while the request message traffic is re-routed to the secondary service provider, the monitor keeps on sending test messages to the primary provider. When the monitor confirms the correct operation of the service, the console instructs the Message Router to return to routing request messages to the primary provider. The solution diagram does not show a monitor for the secondary service provider even though it would be very easy use a second instance of the credit bureau monitor to monitor the backup Credit Bureau service.
Let's look at the implementation of the Context-Based Router. ContextBasedRouter inherits from our trusty MessageConsumer base class to process incoming messages. The ProcessMessage method checks the value of the variable control and routes incoming messages to either the primary or secondary output channel.
ContextBasedRouter Class
delegate void ControlEvent(int control); class ContextBasedRouter : MessageConsumer { ... protected override void ProcessMessage(Message msg) { if (control == 0) { primaryOutputQueue.Send(msg); } else { secondaryOutputQueue.Send(msg); } } protected void OnControlEvent(int control) { this.control = control; Console.WriteLine("Control = " + control); } }
The variable control can be set by invoking the OnControlEvent method. This is the purpose of the ControlReceiver class. This class also inherits from MessageConsumer as it listens for messages from the control channel. The ContextBasedRouter class supplies the ControlReceiver with a delegate of type ControlEvent to invoke when it receives a control event with a numeric value. If you have not come across delegates, they are a really neat, type-safe way to implement callbacks without having to implement another interface or relegating to function pointers ([Box] goes into all the gory details).
ControlReceiver Class
class ControlReceiver : MessageConsumer { protected ControlEvent controlEvent; public ControlReceiver(MessageQueue inputQueue, ControlEvent controlEvent) : base (inputQueue) { this.controlEvent = controlEvent; } protected override void ProcessMessage(Message msg) { String text = (string)msg.Body; Double resNum; if (Double.TryParse( text, NumberStyles.Integer, NumberFormatInfo.InvariantInfo, out resNum)) { int control = int.Parse(text); controlEvent(control); } } }
Enhancing the Management Console
The first version of the management console was so simple that we did not even bother showing the code. All it could do was receive a message and write the message content to a file for later analysis (such as rendering performance graphs from Excel). Now we want to inject some more intelligence into the management console. First, when the primary Credit Bureau Monitor indicates a failure, the management console needs to instruct the Context-Based Router to reroute messages to the secondary service provider. We opted to implement this functionality inside the management console so that we can decouple the monitor and the Context-Based Router (effectively, the management console acts as a Mediator as defined in [GoF]). Also, implementing the failover logic in a central location gives us a single point of maintenance for the system management rules. Commercial management consoles typically include configurable rules engines to determine appropriate corrective actions based on events on the Control Bus.
Second, we want to build a simple user interface for the management console that displays the current state of the system. Obtaining a "big picture" view of a messaging system can be quite difficult, especially if message paths change dynamically. It does not take a lot of components to make it difficult to reconcile where messages flow. Our user interface is simple but nevertheless quite useful. We use the iconic language defined in this book to represent the interaction between components. For now, the user interface only displays the Credit Bureau failover portion of the system, consisting of two services and one Context-Based Router (see picture).
When the Monitor detects a failure and instructs the router to re-route the traffic, we want to update the user interface to reflect the new status (see picture). The router shows the new route for the request messages and the primary Credit Bureau component changes colors to indicate the failure.
Let's have a brief look at the code behind this console. We will focus on the system management part of the code and not dive into the details of the code that renders the pretty user interface pictures. First, the management console needs to be able to retrieve status messages from the monitor component. To make the console as robust as possible, we access the message content in a loosely coupled fashion, reading individual fields from the XML payload. This approach helps keep the management console operational even if the individual components decide to add some fields to the message.
Not surprisingly, the console class also inherits from our good friend MessageConsumer, so we only show the ProcessMessage method here. The component simply reads the message's BodyStream into a string variable and passes it to the different components for analysis.
ManagementConsole - ProcessMessage
public delegate void ControlMessageReceived(String body); public class ManagementConsole : MessageConsumer { protected Logger logger; public MonitorStatusHandler monitorStatusHandler; public ControlMessageReceived updateEvent; public ManagementConsole(MessageQueue inputQueue, string pathName) : base(inputQueue) { logger = new Logger(pathName); monitorStatusHandler = new MonitorStatusHandler(); updateEvent += new ControlMessageReceived(logger.Log); updateEvent += new ControlMessageReceived(monitorStatusHandler.OnControlMessage); } protected override void ProcessMessage(Message m) { Stream stm = m.BodyStream; StreamReader reader = new StreamReader (stm); String body = reader.ReadToEnd(); updateEvent(body); } ... }
The ManagementConsole class uses a delegate to notify the logger and the MonitorStatusHandler. Using a delegate allows us to easily add other classes that also listen on incoming control messages without having to change the code inside the ProcessMessage method.
One of the components analyzing incoming control message data is the MonitorStatusHandler class. First, this class checks whether the incoming message is a MonitorStatus message. If so it loads the message body into an XML document to extract the relevant fields contained inside the <ID> and the <Status> elements. It then invokes the delegate updateEvent which is of type MonitorStatusUpdate. Any interested class inside the management console application can add a callback method to this delegate and be notified any time a MonitorStatus message arrives. All the component has to do is provide an implementation of a method with a signature equal to MonitorStatusUpdate.
MonitorStatusHandler
public delegate void MonitorStatusUpdate(String ID, int Status); public class MonitorStatusHandler { public MonitorStatusUpdate updateEvent; public void OnControlMessage(String body) { XmlDocument doc = new XmlDocument(); doc.LoadXml(body); XmlElement root = doc.DocumentElement; if (root.Name == "MonitorStatus") { XmlNode statusNode = root.SelectSingleNode("Status"); XmlNode idNode = root.SelectSingleNode("ID"); if (idNode!= null && statusNode != null) { String msgID = idNode.InnerText; String msgStatus = statusNode.InnerText; Double resNum; int status = 99; if (Double.TryParse( msgStatus, NumberStyles.Integer, NumberFormatInfo.InvariantInfo, out resNum)) { status = (int)resNum; } updateEvent(msgID, status); } } } }
In our example, the first two components listening to the MonitorStatusUpdate event triggered by the MonitorStatusHandler are two user interface controls representing the primary and secondary Credit Bureau service in the user interface form. Each user interface control filters the events for the identifier that is unique to the respective component that is being monitored. When the status of the monitored component changes, the user interface control changes the color of the component. The following routine that is executed during the initialization of the display form ties the two Credit Bureau display controls to the monitorStatusHandler of the management console:
Console Form Initialization
console = new ManagementConsole(controlBusQueue, logFileName); primaryCreditBureauControl = new ComponentStatusControl("Primary Credit Bureau", "PrimaryCreditService"); primaryCreditBureauControl.Bounds = new Rectangle(300, 30, COMPONENT_WIDTH, COMPONENT_HEIGHT); secondaryCreditBureauControl = new ComponentStatusControl("Secondary Credit Bureau", "SecondaryCreditService"); secondaryCreditBureauControl.Bounds = new Rectangle(300, 130, COMPONENT_WIDTH, COMPONENT_HEIGHT); console.monitorStatusHandler.updateEvent += new MonitorStatusUpdate(primaryCreditBureauControl.OnMonitorStatusUpdate); console.monitorStatusHandler.updateEvent += new MonitorStatusUpdate(secondaryCreditBureauControl.OnMonitorStatusUpdate);
Another component listening to the MonitorStatusUpdate events is the FailOverHandler. This component is a non-visual component that analyzes status messages to determine whether a failover switch should be set. If the status of the monitor has changed (we use a logical XOR denoted by the '^' operator) the FailOverHandler sends a command message to the designated command channel. In our case, this command channel is connected to the Context-Based Router who will start re-routing credit score request messages.
FailOverHandler Class
public delegate void FailOverStatusUpdate(String ID, string Command); public class FailOverHandler { ... public void OnMonitorStatusUpdate(String ID, int status) { if (componentID == ID) { if (IsOK(status) ^ IsOK(currentStatus)) { String command = IsOK(status) ? "0" : "1"; commandQueue.Send(command); currentStatus = status; updateEvent(ID, command); } } } protected bool IsOK(int status) { return (status == 0 || status >= 99); } }
The FailOverHandler also invokes the updateEvent, which is a delegate of type FailOverStatusUpdate. Similar to the MonitorStatusHandler we can register any component that implements a method of this type to receive update notifications whenever the FailOverHandler changes status. In our example, we register the visual FailOverControl to receive these events so that it can redraw whenever the failover status changes. The console user interface initialization routine establishes the connection between these components:
Console Form Initialization
failOverControl = new FailOverControl("Credit Bureau Failover", "PrimaryCreditService"); failOverControl.Bounds = new Rectangle(100, 80, ROUTER_WIDTH, COMPONENT_HEIGHT); FailOverHandler failOverHandler = new FailOverHandler(commandQueue, "PrimaryCreditService"); console.monitorStatusHandler.updateEvent += new MonitorStatusUpdate(failOverHandler.OnMonitorStatusUpdate); failOverHandler.updateEvent += new FailOverStatusUpdate(failOverControl.OnMonitorStatusUpdate);
Connecting the individual components inside the management console through delegates and events results in a loosely coupled architecture. This architecture allows us to reuse the individual components and re-compose them into different constellations similar to the Pipes and Filters architectural style introduced at the beginning of the book. Essentially, passing messages arriving on the Control Bus by using delegates resembles creating an application-internal Publish-Subscribe Channel. Because the control bus events arrive on a Point-to-Point Channel, we have to use a single consumer who then publishes the event to any interested "subscriber" inside the application.
The following collaboration diagram illustrates the propagation of events between the individual components.
Using a user-interface console to visualize the message flow between individual components is a powerful system management tool. Some vendors include development suites that allow designers to visually arrange components and connect their input and outputs ports to create distributed message-flow application. For example, Fiorano's Tifosi product (www.fiorano.com) includes the Distributed Applications Composer that allows the design of a distributed solution from a single graphical user interface even though each component may execute on a different machine or platform. This tool uses a Control Bus to connect all distributed components to a central management and monitoring console.
Our simple example requires the management console hard-code the visual connection between the individual components, e.g. to draw a line between the failover router and the Credit Bureau components. Many integration tools such as Tifosi allow the user to design the solution from the beginning using a graphical user interface. This approach makes it easy to use the graphical design to display the status of the solution.
In some cases we may analyze the message flow in an existing messaging solution to create a graphical representation of the system. There are two fundamental approaches to perform this type of analysis. A Static Analysis analyses the channels that each component publishes and subscribes to. If one component publishes to the same channel another component subscribes to, the tool can draw a connecting line between the two components. Storing this information in a central repository (as supported by many EAI tool suites, for example TIBCO ActiveEnterprise) is a huge benefit for this type of analysis. In the absence of such a repository, we can inspect individual messages and reverse-engineer connections between components based on the origin of messages arriving at a particular component. This task is greatly simplified if the participating components support the creation of a Message History. Without the help of a Message History, we can still reconstruct the flow of messages if each message contains a field specifying the sender of the message (many systems include such a field for authentication purposes).
Limitations of This Example
Unfortunately, in order to fit this example into the scope of a single chapter, we had to make some simplifying assumptions. For example, our failover mechanism does not deal with the messages that are already queued up when the primary Credit Bureau service fails -- these messages remain queued up until the service is reinstated. The Loan Broker is able to continue functioning because it correlates incoming response message to reply messages, but the loan quote requests associated with the 'stuck' message will not be processed until the primary Credit Bureau comes back on-line. In order to improve response times in a fail-over scenario we should implement a re-send function that allows the Loan Broker to re-issue request messages for those messages that are queued up indefinitely in front of a failed service. Alternatively, the fail-over router could store all request messages that have arrived since the correct function of the service was last confirmed. If a service failure is detected, the router could re-send all these messages because some of them might not have been processed correctly. This approach can lead to duplicate request messages (and associated reply messages), but since both the Credit Bureau service and the Loan Broker messages are Idempotent Receivers this does not cause any problems -- duplicate reply messages are simply ignored.
This example demonstrated only a small subset of the system management functions that can be implemented with the patterns in this section. For example, we could monitor message traffic across all components, set performance thresholds, have each component send "heartbeat" messages and more, In fact, adding robust system management to a distributed messaging solution can require as much (or more) design and implementation effort as the original solution.