Persistent Message Implementation
It is important to understand how Spring Integration uses channel adapters to interact with external systems. A channel adapter is a type of message endpoint that can connect a single message source or sink to a message channel. A channel adapter can be either an inbound channel adapter or an outbound channel adapter. The inbound channel adapter can read from a source and create messages to send on a channel. The outbound channel adapter can read messages from a channel and convert them to send out to some message sink such as JMS or HTTP.
SI comes with a number of built-in adapters. For example, a simple inbound channel adapter may be configured to read from a bean named "businessRequestService" using method "getNextRequest" and send the newly created messages on "businessRequestChannel" as shown below. The channel adapter is triggered every five seconds based on the configured poller.
< inbound-channel-adapter ref="businessRequestService" method="getNextRequest" channel="businessRequestChannel">
< interval-trigger interval="5000"/>
Channel adapters serve as the integration point to any external system, or in the previously described use case, to a persistent message store.
To implement the persistent message channel, you need to create both an outbound channel adapter and an inbound channel adapter (in the same class). However, rather than talking to an external system, the adapters will convert the message to a serialized format and store the message in a database. This example uses a DAO, which is implemented using direct JDBC calls to HSQL, to abstract away the database details.
|Figure 2. Persistent Message Implementation: This diagram shows the components and transaction boundaries for a persistent message implementation.|
The key to a proper implementation is to understand the transaction boundaries of channels and message error handling. The components and transaction boundaries of the implementation are shown in Figure 2. Notice that the only custom classes in the implementation are the persistent message adapter and the message DAO. All the other components of the flow are built-in SI types. Each of the custom types will be discussed in more detail as the implementation is presented.
Comparing Figure 2 with the flow presented in Figure 1 reveals that the start and end of each flow are identical. This is one of the key flexibility points of using a message bus. The details of the message flow are a configuration matter handled by the SI XML configuration file; they have little impact on the application. In fact, given the flow in Figure 2, with little or no code changes Business Service A could have been using Business Service B directly before being switched to the message flow implementation.
Persistent Message Adapter
Spring Integration's channel adapter endpoints transport a message from the SI message channel to an external system. You can leverage this functionality to convert the message into a serialized format that can be persisted to a database. To do this, you implement your own adapter POJO that is referenced by the output-channel-adapter and input-channel-adapter elements to consume and produce messages, respectively.
Listing 2 shows the implementation of a message adapter that depends on the message DAO to handle the database details. The adapter implements a save method for outbound messages and a removeNext method for inbound messages. Both methods use generics to support messages containing any payload. The save method is straightforward; it simply passes the message to the DAO along with a configured queue name. The queue name increases the flexibility of the solution by allowing the same message DAO (using the same database table) to serve multiple message adapters with different queues.
The removeNext method requires only slightly more code. It reads a message from the DAO, again using the queue name, and if a message is available it immediately deletes the message from the queue using the DAO. Because the removeNext method executes in a transaction, the deletion of the message will be committed only if the entire transaction succeeds.
Message Data Access Object
The message DAO abstracts the underlying database interaction. The implementation in Listing 3
uses Spring's JdbcOperations class to talk directly to an HSQL database using JDBC. As part of a larger application, you would be better off using an object relational mapping (ORM) technology such as Hibernate or iBATIS. As with most Spring-based applications, you can replace the implementation of the DAO later with little or no application code changes.
The message itself is stored in the database as a binary large object (BLOB) using standard Java object serialization. The creation date and queue name are also inserted so the oldest message in a given queue can be found quickly when the adapter requests the next message.
Error handling is the final component to the solution. Listing 4
shows the error-handling method implemented to simply delete messages that cause an unhandled exception in the receiving endpoint. After rolling back the transaction started by the poller on the inbound channel adapter, SI redirects the failed messages to an error channel that works just like any other SI channel. You can configure the error channel in three ways:
- A global channel for all failed messages
- A channel for a single endpoint
- A channel per message based on a message header property
An SI header enricher is used to configure a specific error channel in the message header before the message is delivered to the final endpoint. The error channel is connected to the error-handling method using the familiar service activator configuration element. An example of this type of configuration is presented in the final SI configuration for the solution in the next section.
Rather than simply deleting the message as this implementation does, an application-specific implementation can perform any action you need, such as deleting dead messages, sending a notification email to an administrator, updating a retry count, or simply leaving the message in the message queue for redelivery.
Message Flow Configuration
The final step for creating a complete message flow that supports persistent messaging is to link together the application business objects, the SI channels, the persistence message adapter, and the error handler. Listing 5
presents the complete XML configuration for a simple application that uses SI with persistent messaging to send email notifications asynchronously. In the configuration, the ProductOrderingService corresponds to Business Service A and the MailService corresponds to Business Service B from Figure 2
. The ProductOrderingService works with an interface to the MailService, which is actually an instance of the dynamic proxy that serves as the messaging gateway. The message then travels through the saveMailChannel to the channel adapter where is it saved to the database in the same transaction as the ordering service.
In a separate transaction, the message is read from the channel adapter and travels through the sendMailChannel, which is actually a chain that includes the error channel header enricher and the final service activator. Finally, the message payload is delivered to the actual MailService implementation and processed in the same transaction.
If the MailService fails to process the message and raises an exception, SI rolls back the transaction and redirects the message to the queueAdapterErrorMessageChannel identified in the message header (where it will be properly handled as a dead message). As you can see, the application is completely unaware of the path that the message travels and of the error-handling mechanism making the architecture flexible. If you decided that email messages were no longer a critical part of the application, you could replace the channel adapters with a simple in-memory queue channel configuration without needing to change any application code.
A sample application is available for download with this article. It provides the entire source code, a small HSQL database, and an Ant script that will allow you to run the examples. The included README file has directions for running the examples.