n a previous article I covered some of the basics of JMS using OpenJMS as the JMS provider. In this article, you’ll see some more advanced OpenJMS topics such as message selectors, durable subscriptions, and message persistence along with code examples that show you how to implement these concepts programmatically.
Sometimes you don’t want message consumers to receive every message sent by a sender or publisher. Instead you want to be able to filter received messages. The JMS API provides this facility through message selectors. The message consumer specifies which messages it is interested in. The JMS provider then selectively sends just those messages to the client. Note that the onus of filtering messages is left up to the JMS provider, not the consuming application. The provider can scrutinize the header values and message properties with message selectors, but unfortunately, OpenJMS cannot analyze the incoming message bodies. I’ll show you an example message selector in just a bit, but first here are a couple of topics (no JMS pun intended) to set the stage.
Message Header Fields
When a JMS provider sends a message it generates the following header fields:
- JMSMessageID?A unique message identifier
- JMSDestination?The queue or topic to which the message is sent.
- JMSRedelivered?States whether the message has been resent
The client sending application can set a number of header fields. Refer to the JMS API documentation for a full breakdown of what headers the client can set. Each header field has getter and setter methods. Some of the fields the client can set include:
- JMSPriority?You can set a message priority from 0 to 9 (9 being the highest priority and 4 being the default). However, note that the there is no guarantee that higher priority messages will be delivered before lower priority ones.
- JMSType?String identifying contents of a message
- JMSReplyTo?Where responses should be sent
- JMSExpiration?An expiration time for the message
While the preceding message header fields are predefined, clients can add more information to JMS messages via the extensibility of properties. These properties are name/value pairs which the client application defines. The values for properties can be of type boolean, byte, short, int, long, float, double, or string (each type has a corresponding Message.setProperty method). For example, using the syntax below, you could set a property named “Sport” to “Basketball”
Using Message Selectors
Here’s an example that incorporates message properties, a message priority, and message selectors. Here’s the scenario. A producer application publishes messages to a topic (see Figure 1). The messages have different JMS priorities and different message properties. One consumer application for this topic is finicky; it specifies to the JMS provider that it wants to filter the messages it will receive by using a message selector.
This example uses OpenJMS, an open source implementation of Sun Microsystem’s Java Message Service API 1.0.2 specification as the JMS provider. You can learn more about the OpenJMS server at http://www.devx.com/Java/Article/20903.
The code fragment shown below (see the file PublishToTopic.java in the downloadable sample code for the full listing) is from the producer application.
TextMessage messageOne = topicSession.createTextMessage(); message = "Message #1 - Sport: Basketball " + hour12 + ":" + min + ":" + sec; messageOne.setText(message); messageOne.setStringProperty("Sport", "Basketball"); topicPublisher.publish(messageOne, DeliveryMode.PERSISTENT, 9, 999999);
|Figure 1: The message producer publishes messages to the topic with different “Sport” property values.|
Note that you need to have the consumer (which we’ll cover in just a bit) running before you run PublishToTopic. You can mitigate this timing dependency with durable subscriptions which I’ll cover later in this article.
The PublishToTopic application sends four different messages to the topic named “topic1.” The publisher sets the “Sport” property to different values for different messages and also sets different priorities for each message using the publish method.
void publish(Message message, int deliveryMode, int priority, long timeToLive)
Using the publish method, you can specify a JMSPriority (from 0-9, with 9 being the highest priority and 4 being the default). It’s important to note that if you use the setPriority method of the MessageProducer interface, you set the priority level of all the messages sent by the producer; whereas, by default, if you do not set a priority level, the default level is 4. To set the priority of a particular message, use the publish(Message, int, int , long) method. The publish method also lets you specify a delivery mode and a “time to live” for the message. By default, a message never expires. If a message being sent will become obsolete after a certain period of time, you can specify its time to live via the fourth argument of the publish method. You specify the time to live in milliseconds. The delivery mode can be either the value deliveryMode.PERSISTENT or deliveryMode.NON_PERSISTENT.
The consumer application, ConsumerUsingFilters.java (a section of which is shown below) listens asynchronously to the topic. You can see that the subscriber filters its consumption of messages via the SQL92 string: “Sport in (‘Basketball’,’Football’)”
// create a topic subscriber and associate // a filtering criterion String filterQuery = "Sport in ('Basketball','Football')"; topicSubscriber = topicSession.createSubscriber(topic, filterQuery, false);
The code shown above states that the client only wants messages where the property “Sport” is set to either “Basketball” or “Football.” Messages having the property set to “Hockey” and “Baseball” will not be consumed because they will be filtered out by the subscribing application. This can be seen in Figure 2, which shows the consumer application running:
|Figure 2: The Consumer application selectively filters messages based on the value of the “Sport” property.|
You can also filter using message priority. For example, you can change the filter query to:
String filterQuery = "JMSPriority > 6"; topicSubscriber = topicSession.createSubscriber(topic, filterQuery, false);
Now when you run the client application, Figure 3 shows that the application consumes only messages with a JMSPriority higher than 6. Everything else was filtered.
|Figure 3: The modified consumer application filters messages based on the JMSPriority.|
As stated earlier, the examples in this and the first article in this series have a requirement that the consuming subscribing application had to be running when a publisher was publishing?otherwise, the messages would be lost. You can mitigate this message loss with durable subscriptions. In a durable subscription, the JMS server will hold messages for a client subscriber after it has formally subscribed. Durable topic subscriptions receive messages published while the subscriber is not active. Subsequent subscriber objects specifying the identity of the durable subscription can resume the subscription in the state it was left by the previous subscriber. In OpenJMS, the topic “topic1” is a durable topic.
Note that a client must designate itself as a durable subscriber before the JMS server can start holding messages for the client. The DurableSubscriberExample class shows the subscriber that listens on topic1.
The line below from the example code shows how to create a durable topic subscriber:
// create a durable topic subscriber and associate it // to the retrieved topic durableTopicSubscriber = topicSession.createDurableSubscriber( topic,"DoNotDrop");
As shown above, you create the TopicSubscriber object with the createDurableSubscriber method of the TopicSession object. The method takes two arguments, the topic name and a string that specifies the name of the subscription. In the snippet above, this was “DoNotDrop“
You can try things out by running DurableSubscriberExample to first register the durable subscriber with the JMS server. Then kill the application and run PublishToTopic. If you were working with a non-durable subscription, the messages would be lost. However, if you now run DurableSubscriberExample again, you’ll see that you are still able to receive the messages published while the subscriber application was not running.
To delete a durable subscription on the JMS provider, close the subscriber and then use the unsubscribe method, specifying the subscription name as an argument:
You can use the code above to delete the state the JMS provider maintains for the subscriber.
If a JMS provider should fail, you can use message persistence to ensure that messages are not lost. Using the persistent delivery mode (the default) instructs the JMS provider to persist messages to a stable store (a database) as part of the client’s send operation.
By default, OpenJMS writes to a file (openjms.db which is found in the bin directory of the OpenJMS installation).
Configuring a database for message persistence requires the following steps:
- Add the JDBC driver to the CLASSPATH
- Edit the OpenJMS configuration file and specify connection properties
- Create the tables OpenJMS needs for persisting messages
Oracle8i, Sybase ASE, MySQL, and HSQLDB have been formally tested for message persistence. However, OpenJMS should work with any JDBC 2.0 compliant database.
I used IBM’s DB2 Universal Database to test things out by first modifying the setenv.bat batch file (located in the bin directory of the OpenJMS installation) to point the CLASSPATH entry to the DB2 JDBC driver:
set CLASSPATH=C:Program FilesIBMSQLLIBjavadb2java.zip
You’ll need to create the database before you can configure it. In DB2, issue the following command to the DB2 Command Line Processor:
db2 => create db openjms
Next, edit the OpenJMS configuration file (opensjms.xml, located in the config directory of the OpenJMS installation) and modify it to specify the DB2 database connection properties:
Depending on the database you are using, you might be able to create the tables which OpenJMS needs by using the db2tool application (located in the OpenJMS installation’s bin directory) which ships with OpenJMS. This is done with the command:
dbtool.bat -create -config %OPENJMS_HOME%configopenjms.xml
To create the tables for DB2, I had to issue the following command from the DB2 Command Window after changing to the configdb of the OpenJMS installation directory and logging into DB2 as the user specified in the openjms.xml file:
db2 -tvf create_db2.sql
The configdb directory contains scripts for most popular databases.
The figure below shows the DB2 Control Center peeking into the openjms database, in which the OpenJMS tables have been created.
|Figure 4: The OpenJMS tables created in DB2.|
After successfully configuring your persistence database with the necessary tables, OpenJMS will persist messages there, ensuring no message is lost during an OpenJMS provider failure.
This article has shown you some of the more advanced JMS topics, including message selectors, durable subscriptions, and message persistence. You can use message selectors to filter a client’s consumption of messages. Durable subscriptions eliminate the need for a subscriber to be running when a message is published. Message persistence ensures that messages are not lost in case the JMS provider in your setup fails.