devxlogo

Extending Spring Integration Through Channel Adapters

Extending Spring Integration Through Channel Adapters

n the past few years, some major Java-related developments have occurred in enterprise integration and messaging solutions. Several open source Java Message Service (JMS) solutions such as OpenMQ and ActiveMQ have emerged, and enterprise service buses (ESBs) such as ServiceMix and Mule have been gaining popularity. Now SpringSource has thrown its hat into the ring with the recent release of Spring Integration (SI) version 1.0.0. The Spring Integration framework supports the construction of message-passing architectures and legacy application integration by implementing most of the patterns described in Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf, considered one of the seminal works on the subject.

As with all the SpringSource projects, SI uses many of the preexisting conventions and a simple POJO (plain old java object) configuration, which makes it quick and easy to use. However, SI does not provide the ability to persist messages in a database, a common solution that enables systems to couple message delivery to transactions, retry delivery, and recover after a server restart. This article demonstrates how to create a basic message-passing application with SI, as well as how to use a custom channel adapter and data access object (DAO) for persistent messaging. By writing a custom channel adapter, you can easily integrate database persistence into the overall SI framework.

Spring Integration Components

Spring Integration contains three main components: messages, message channels, and message endpoints.

  • Messages: Composed of a payload and headers, a message carries information from a sender to a message endpoint. The payload is the business domain object you want to send. Message headers contain message metadata such as a unique message ID, creation date, and other fields used by SI. Applications can put their own information in the message headers to extend the capabilities of the framework.
  • Message channels: The paths that messages take to arrive at the endpoint are called message channels. SI supports many different types of message channels, such as the Direct Channel for sending a message within the thread of the sender and the Queue Channel, which allows messages to be queued for asynchronous delivery.
  • Message endpoints: The destination of a message is called the message endpoint. Endpoints work in many use cases, including routing, splitting messages, combining messages, and invoking services.

The current SI release contains many different implementations of channels and endpoints. Review the Spring Integration manual for more information on these components.

Basic Message Flow Construction

To construct a basic workflow or message bus, you create and connect the core components using the SI domain-specific namespace in a bean configuration file (similar to any other Spring Framework configuration). Listing 1 presents a very basic message flow configuration using one message gateway, a channel, and a message endpoint. Figure 1 presents the same message flow visually.

Figure 1. Basic Message Flow Configuration: This diagram shows a basic message flow with a gateway, channel, and endpoint.

Using a message gateway, SI creates an anonymous implementation of your application-specific interface. This anonymous implementation hides the underlying message bus from the rest of the application. The service activator invokes a message on a normal application business object to deliver the message payload.

This pattern continues through the rest of the framework, allowing the details of the underlying message framework (such as the channel type, transformations, persistence, and routing) to be completely transparent and reconfigurable without any changes to application code. The use of the gateway and service activator removes any dependency on the framework from the application code.

Use Case for Custom Channel Adapter

While SI does bundle a number of useful endpoints and connectors, including JMS, files, remote method invocation, and HTTP invokers, it does not include an endpoint for message persistence in a database. One common solution when deploying an application server is to configure the message flows to use JMS for persistence. JMS provides guaranteed message delivery, error handling, and can participate in distributed transactions to tie message sending to transaction success. However, the configuration, deployment, and maintenance of JMS can be overkill for an application that simply uses message passing internally for asynchronous processing. In those cases, using JMS also can require distributed transactions, which lower scalability and performance.

Extending SI with a custom channel adapter allows you to skip JMS and distributed transactions all together while still meeting application requirements such as the following:

  1. Transaction coupling: Messages should not be delivered until the business transaction is committed. This is important to ensure that emails and orders are not processed if the initial operation fails.
  2. Retrying messages: If a message fails to process, you have the option to retry the message or drop it based on the message flow configuration. Some message channels are important, while others can be silently dropped.
  3. Persistent: The messages must survive a server shutdown or crash. By default, SI is mostly an in-memory messaging framework. Assuring that messages will not be lost if the application shuts down or crashes is a necessity.

Persistent Message Implementation

It is important to understand how Spring Integration uses channel adapters to interact with external systems. A channel adapter is a type of message endpoint that can connect a single message source or sink to a message channel. A channel adapter can be either an inbound channel adapter or an outbound channel adapter. The inbound channel adapter can read from a source and create messages to send on a channel. The outbound channel adapter can read messages from a channel and convert them to send out to some message sink such as JMS or HTTP.

SI comes with a number of built-in adapters. For example, a simple inbound channel adapter may be configured to read from a bean named “businessRequestService” using method “getNextRequest” and send the newly created messages on “businessRequestChannel” as shown below. The channel adapter is triggered every five seconds based on the configured poller.

< inbound-channel-adapter ref="businessRequestService" method="getNextRequest" channel="businessRequestChannel">  < poller>    < interval-trigger interval="5000"/>  < /poller>< /inbound-channel-adapter>

Channel adapters serve as the integration point to any external system, or in the previously described use case, to a persistent message store.

High-Level Design

To implement the persistent message channel, you need to create both an outbound channel adapter and an inbound channel adapter (in the same class). However, rather than talking to an external system, the adapters will convert the message to a serialized format and store the message in a database. This example uses a DAO, which is implemented using direct JDBC calls to HSQL, to abstract away the database details.

Figure 2. Persistent Message Implementation: This diagram shows the components and transaction boundaries for a persistent message implementation.

The key to a proper implementation is to understand the transaction boundaries of channels and message error handling. The components and transaction boundaries of the implementation are shown in Figure 2. Notice that the only custom classes in the implementation are the persistent message adapter and the message DAO. All the other components of the flow are built-in SI types. Each of the custom types will be discussed in more detail as the implementation is presented.

Comparing Figure 2 with the flow presented in Figure 1 reveals that the start and end of each flow are identical. This is one of the key flexibility points of using a message bus. The details of the message flow are a configuration matter handled by the SI XML configuration file; they have little impact on the application. In fact, given the flow in Figure 2, with little or no code changes Business Service A could have been using Business Service B directly before being switched to the message flow implementation.

Persistent Message Adapter

Spring Integration’s channel adapter endpoints transport a message from the SI message channel to an external system. You can leverage this functionality to convert the message into a serialized format that can be persisted to a database. To do this, you implement your own adapter POJO that is referenced by the output-channel-adapter and input-channel-adapter elements to consume and produce messages, respectively.

Listing 2 shows the implementation of a message adapter that depends on the message DAO to handle the database details. The adapter implements a save method for outbound messages and a removeNext method for inbound messages. Both methods use generics to support messages containing any payload. The save method is straightforward; it simply passes the message to the DAO along with a configured queue name. The queue name increases the flexibility of the solution by allowing the same message DAO (using the same database table) to serve multiple message adapters with different queues.

The removeNext method requires only slightly more code. It reads a message from the DAO, again using the queue name, and if a message is available it immediately deletes the message from the queue using the DAO. Because the removeNext method executes in a transaction, the deletion of the message will be committed only if the entire transaction succeeds.

Message Data Access Object

The message DAO abstracts the underlying database interaction. The implementation in Listing 3 uses Spring’s JdbcOperations class to talk directly to an HSQL database using JDBC. As part of a larger application, you would be better off using an object relational mapping (ORM) technology such as Hibernate or iBATIS. As with most Spring-based applications, you can replace the implementation of the DAO later with little or no application code changes.

The message itself is stored in the database as a binary large object (BLOB) using standard Java object serialization. The creation date and queue name are also inserted so the oldest message in a given queue can be found quickly when the adapter requests the next message.

Error Handling

Error handling is the final component to the solution. Listing 4 shows the error-handling method implemented to simply delete messages that cause an unhandled exception in the receiving endpoint. After rolling back the transaction started by the poller on the inbound channel adapter, SI redirects the failed messages to an error channel that works just like any other SI channel. You can configure the error channel in three ways:

  1. A global channel for all failed messages
  2. A channel for a single endpoint
  3. A channel per message based on a message header property

An SI header enricher is used to configure a specific error channel in the message header before the message is delivered to the final endpoint. The error channel is connected to the error-handling method using the familiar service activator configuration element. An example of this type of configuration is presented in the final SI configuration for the solution in the next section.

Rather than simply deleting the message as this implementation does, an application-specific implementation can perform any action you need, such as deleting dead messages, sending a notification email to an administrator, updating a retry count, or simply leaving the message in the message queue for redelivery.

Message Flow Configuration

The final step for creating a complete message flow that supports persistent messaging is to link together the application business objects, the SI channels, the persistence message adapter, and the error handler. Listing 5 presents the complete XML configuration for a simple application that uses SI with persistent messaging to send email notifications asynchronously. In the configuration, the ProductOrderingService corresponds to Business Service A and the MailService corresponds to Business Service B from Figure 2. The ProductOrderingService works with an interface to the MailService, which is actually an instance of the dynamic proxy that serves as the messaging gateway. The message then travels through the saveMailChannel to the channel adapter where is it saved to the database in the same transaction as the ordering service.

In a separate transaction, the message is read from the channel adapter and travels through the sendMailChannel, which is actually a chain that includes the error channel header enricher and the final service activator. Finally, the message payload is delivered to the actual MailService implementation and processed in the same transaction.

If the MailService fails to process the message and raises an exception, SI rolls back the transaction and redirects the message to the queueAdapterErrorMessageChannel identified in the message header (where it will be properly handled as a dead message). As you can see, the application is completely unaware of the path that the message travels and of the error-handling mechanism making the architecture flexible. If you decided that email messages were no longer a critical part of the application, you could replace the channel adapters with a simple in-memory queue channel configuration without needing to change any application code.

A sample application is available for download with this article. It provides the entire source code, a small HSQL database, and an Ant script that will allow you to run the examples. The included README file has directions for running the examples.

Limitations of Persistent Channel Implementation

While the persistent channel implementation combined with Spring Integration goes a long way toward a complete messaging solution, it has a few limitations. First, it does not support multiple consumers per queue because messages are selected from the database and not deleted until the transaction commits. This means that a single message would be processed by all the consumers at the same time, which you probably don’t want. The next section discusses a solution for this.

The implementation’s shutdown process presents another limitation. Spring Integration supports cleanly shutting down when the Spring Framework application context is shut down (normally by a servlet shutdown). However, the implementation’s shutdown process doesn’t properly flush the channels or allow active tasks to complete, which can result in a ThreadDeath exception from the JVM. Discussions in the SpringSource forums have documented some workarounds, and the issue is being tracked in the bug tracker.

Another limitation is related to transaction management and message redelivery. One of the original application requirements was to support message redelivery, which is achieved with a transaction that the inbound channel adapter poller starts. Without careful error handling, an unexpected runtime exception causes the transaction to roll back and the message will be redelivered indefinitely. This behavior is undesirable in most cases because the runtime exception may continually be raised, consuming valuable resources. You can prevent indefinite redelivery by using the error handler previously discussed or by implementing an extension to perform retry counting (more about this shortly).

The final limitation is related to round-trip messages. SI supports round-trip messages that can simulate a method call over the message bus with parameters and a return value. This feature is contrary to the initial requirement linking the business transaction with the delivery of the outbound message. Receiving a response message would be impossible. If the business service is blocked waiting for a response while the transaction is still active, the request message would never be sent. It may be possible to implement this feature if the transaction requirements are relaxed.

Recommended Extensions

One extension you may want to implement is the ability to have multiple consumers on a single persistent channel adapter. One way to accomplish this is by modifying the message DAO to use Oracle row locking. Each request for a new message performs a “select for update” condition, which locks the row returned. At the same time, a “skip locked rows” condition is used to skip any rows already locked. This allows each consumer to locate, lock, and process the first row in the message table that is not currently locked by another consumer. While this relies on an Oracle-specific feature, a similar approach could be implemented using a shared lock table or in-memory list of locked message identifiers.

Using the multiple-consumer locking approach, you can also implement fault tolerance by having parallel deployments of the application share a locking mechanism. This allows each deployment to process messages from a single message table while not processing the same message at the same time in each instance. The Oracle row-locking approach makes this simple. If a deployment fails, the messages will continue to be processed in the other instance immediately.

One final extension that would be useful is to implement message redelivery counting. In the current implementation, SI supports a per-message error channel that you can use to your advantage. While logging the error and deleting the message (as the current error handler does) works fine, in some cases you may want to re-queue the message for a future retry attempt. You could do this by adding a counter to the persisted message and updating the retry count in the error handler. Once the count exceeds a configured maximum, simply drop the message. You can use the same approach for sending an email to an administrator, sending an IM, posting a Twitter message, or simply leaving the message in the queue for redelivery.

Make SI What Your Application Needs It to Be

While SI is young and has a few shortcomings, it does have a bright future. The extensibility of the framework opens it up for a number of interesting integration scenarios, including communications with proprietary systems. As demonstrated in this article, with a persistent message adapter implementation in place, SI can provide a reliable alternative to JMS for an application that needs simple, in-process message passing and asynchronous processing. By loosely coupling your application’s components in a message-passing architecture, you create a more flexible and future-proof system.

SI also offers the benefit of simple POJO bean configuration, testability, and a simple XML configuration format, just like the core Spring Framework.

devxblackblue

About Our Editorial Process

At DevX, we’re dedicated to tech entrepreneurship. Our team closely follows industry shifts, new products, AI breakthroughs, technology trends, and funding announcements. Articles undergo thorough editing to ensure accuracy and clarity, reflecting DevX’s style and supporting entrepreneurs in the tech sphere.

See our full editorial policy.

About Our Journalist