Leveraging EJB Timers for J2EE Concurrency

oncurrency is a well-known technique for improving the performance of many J2EE applications. For example, Web 2.0 mashup applications must aggregate data from multiple sources. Performing the aggregation in a single thread can result in high latency that degrades the end user experience. Concurrency can significantly improve performance by parallelizing the aggregation tasks. Similarly, concurrency can optimize computationally intensive operations and address performance bottlenecks in blocking I/O and network operations.

A number of approaches are available for concurrency in J2EE applications, but each has its shortcomings:

  • Explicit allocation of threads is discouraged by the J2EE specifications since containers handle thread management.
  • Using the Work Manager specification results in non-portable solutions that are limited to certain application servers, such as WebLogic and WebSphere, because it is not part of the J2EE specifications (including Java EE 5).
  • Encapsulating the work units in JMS queue messages that are then processed by Message Driven Beans (MDBs) in a concurrent fashion is appropriate for certain scenarios. However, this approach is relatively complex and resource-intensive for most applications.

This article introduces an alternative approach for J2EE concurrency that takes advantage of the inherent thread pool management of EJB timers. This approach is portable across application servers and does not require a messaging provider and resources such as queues and connection factories. Furthermore, since EJB timers are transactional and recoverable, this approach enables robust and reliable concurrency solutions.

The discussion begins with an overview of the EJB Timer-based concurrency API and goes on to present an example that demonstrates how to use the API. Then it delves into the details of the concurrency framework implementation and highlights factors you must consider when using this approach. The code presented is based on JDK 1.5 and EJB 3, and it was tested on JBoss 4.04GA (TimerConcurrencyEAR.ear). An EJB 2.1-based implementation (TimerConcurrencyEAR-EJB2_1.ear) is also provided in the code download.

EJB Timer-Based Concurrency API
To schedule tasks concurrently, you work with two main framework interfaces: Task and ConcurrencyManager. You need to implement the Task interface that follows to represent the work you plan to do:

public interface Task extends Runnable, Serializable{	enum Status { Inactive, Started, Completed };		public void setStatus(Status status);	public Status getStatus();}

Task extends from Runnable, which encapsulates the work to be done in its run method. It also extends from java.io.Serializable to satisfy the recoverability requirement of EJB timers. The framework uses the status accessors to update the state of the task as it progresses through the execution lifecycle:

  1. Inactive, the initial state, indicates that execution has not yet begun.
  2. Started denotes that execution has started but not yet completed.
  3. Completed signifies that the task completed either successfully or unsuccessfully.

The class AbstractTask, provided for convenience, implements the status field accessors. Most of the time, you’ll extend it as follows instead of implementing Task:

abstract public class AbstractTask implements Task{	private Status status=Status.Inactive;		public void setStatus(Status status)	{		this.status = status;	}	public Status getStatus(){		return status;	}}

To schedule tasks for concurrent execution, you use the ConcurrencyManager interface like this:

public interface ConcurrencyManager{	public void execute(Collection tasks);	public void executeAndWait(Collection tasks);	public Collection executeAndWait(Collection tasks, long timeout);	}

The ConcurrencyManager provides three ways of scheduling tasks:

  • void execute(Collection tasks): This non-blocking method schedules the tasks for concurrent execution and returns immediately without waiting for completion.
  • void executeAndWait(Collection tasks): This blocking method schedules the tasks for concurrency execution and waits until all tasks are completed.
  • Collection executeAndWait(Collection tasks, long timeout): This method schedules tasks for concurrent execution and waits until all tasks are completed OR the timeout is exceeded, whichever comes first. It returns the set of completed tasks.

The various styles of concurrent execution offer flexibility in dealing with different application scenarios. The blocking style of scheduling is useful in applications where tasks have a relatively short time span and the client needs the results immediately. The non-blocking style is geared towards long-running tasks where the results are not needed immediately. In this case, tasks will often write results to the database to be later retrieved by another request.

The EJBTimerConcurrencyManager stateless session bean (SSB) provides the ConcurrencyManager implementation. An upcoming section covers it in detail.

EJB Concurrency Example: Fibonacci
The example concurrency application uses the Fibonacci numbers, a common mathematical formula where each element in the sequence is the sum of the previous two numbers. You can formulate it as follows (more efficient formulations would not grow exponentially with respect to n, but this one will suffice for the example):

f(n) = 0, if n=0f(n) = 1, if n=1f(n) = f(n-1) + f(n-2), if n>1

This produces a sequence that looks like this:

0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144 ...

Suppose your requirement is to implement an EJB Fibonacci calculator that can compute, in parallel, the Fibonacci results for a set of numbers. You first must implement a task that calculates the Fibonacci result for a given number. Here’s one possible implementation of your FibonacciTask:

public class FibonacciTask extends AbstractTask{	private int n;	private long result;	public FibonacciTask(int n){		this.n = n;	}	public void run()	{		result = fibonacci(n);	}		private long fibonacci(int n){		long ret = n;		if (n > 1)			ret = fibonacci(n-1) + fibonacci(n-2);		return ret;	}		public int getN(){		return n;	}	public long getResult()	{		return result;	}}

The run() method simply calls the fibonacci(n) method and stores the result in the class field. The fibonacci(n) method is a recursive implementation of the formula described earlier.

The next step is to implement the calculateFibonacci() method in FibonacciCalculatorEJB. This method will instantiate the appropriate FibonaaciTask(s) and submit them to the EJBTimerConcurrencyManager for execution. To inject the EJBTimerConcurrencyManager into the ConcurrencyManager variable, you can use the EJB3 @EJB annotation as follows:

@[email protected]public class FibonacciCalculatorEJB implements FibonacciCalculator{	@EJB	private ConcurrencyManager concurrencyManager;	...

The calculateFibonacci method takes as argument an array of numbers whose Fibonacci needs to be computed. This example initially schedules the tasks (one for each Fibonacci number) using the blocking scheduling method (executeAndWait with no timeout) that returns only when all Fibonacci results are calculated. The elapsed time is printed along with the result:

public void calculateFibonacci(int[] numbers){		ArrayList tasks = new ArrayList();	for (int i = 0; i < numbers. length; i++)		tasks.add(new FibonacciTask(numbers[i]));	long initialTime = System.currentTimeMillis();	concurrencyManager.executeAndWait(tasks);	long elapsedTime = System.currentTimeMillis() - initialTime;	printFibonacciResults(tasks,elapsedTime);}

The following is the printFibonacciResults method implementation:

private void printFibonacciResults(Collection tasks, long elapsedTime){	logger.info("** Completed Fibonacci Computations in " + elapsedTime + "ms **");	for (Task task : tasks){		FibonacciTask ft = (FibonacciTask) task;		logger.info("Fibonacci(" + ft.getN() + ") = " + ft.getResult());	}}	

Calling the calculateFibonacci method with the numbers 1, 7, 20, 31, and 35 produces the following (the elapsed time will vary depending on your hardware configuration):

16:42:41,759 INFO  [FibonacciCalculatorEJB] ** Completed Fibonacci Computations in 381ms **16:42:41,759 INFO  [FibonacciCalculatorEJB] Fibonacci(1) = 116:42:41,759 INFO  [FibonacciCalculatorEJB] Fibonacci(7) = 1316:42:41,759 INFO  [FibonacciCalculatorEJB] Fibonacci(20) = 676516:42:41,759 INFO  [FibonacciCalculatorEJB] Fibonacci(31) = 134626916:42:41,759 INFO  [FibonacciCalculatorEJB] Fibonacci(35) = 9227465

As you can see, parallelizing the tasks using the concurrency framework is relatively straightforward. One factor to consider in this example is the exponential growth of Fibonacci computation time as n increases. You can provide a timeout parameter to the concurrency manager’s executeAndWait method to specify the maximum time you wish to wait. The executeAndWait method returns the set of completed tasks at timeout:

public void calculateFibonacci(int [] numbers, long timeout){	ArrayList tasks = new ArrayList();	for (int i = 0; i < numbers.length; i++)		tasks.add(new FibonacciTask(numbers[i]));	long initialTime = System.currentTimeMillis();	Collection completedTasks = concurrencyManager.executeAndWait(tasks, timeout);	long elapsedTime = System.currentTimeMillis() - initialTime;	printFibonacciResults(completedTasks, elapsedTime);}	

Calling the method with the numbers 1, 4, 9, 24, and 40 and a timeout of 300ms produces the following:

16:42:42,089 INFO  [FibonacciCalculatorEJB] ** Completed Fibonacci Computations in 310ms **16:42:42,089 INFO  [FibonacciCalculatorEJB] Fibonacci(1) = 116:42:42,089 INFO  [FibonacciCalculatorEJB] Fibonacci(4) = 316:42:42,089 INFO  [FibonacciCalculatorEJB] Fibonacci(9) = 3416:42:42,089 INFO  [FibonacciCalculatorEJB] Fibonacci(24) = 46368

The output indicates that the executeAndWait returned after 310ms (the extra 10ms is due to overhead) without waiting for Fibonacci(40) to complete.

The following section discusses the implementation details of the EJB Timer-based concurrency framework. It covers the implementation of the non-blocking execute method first, and then discusses the blocking style of execution.

Framework Implementation Details
If you are mainly interested in using the EJB Timer-based concurrency framework for concurrency and don’t want to delve into the implementation details, just skip to the next section! The code download provides the necessary EAR files.

The framework uses the EJB Timer Service to create a timer that triggers immediately (zero expiration). Once the timeout handler is called, the framework executes the task and updates its status. As described earlier, the SSB EJBTimerConcurrencyManager implements the ConcurrencyManager interface:

@[email protected]public class EJBTimerConcurrencyManager implements ConcurrencyManager{	private @Resource TimerService timerService;....

The Timer Service is injected into the bean using the EJB3 @Resource annotation. To schedule a task, you access timerService and create a timer with the task wrapper as a parameter. You submit a task wrapper instead of the task itself to perform functions such as updating the task status and waiting on task completion (covered later in the blocking discussion):

public void execute(Collection tasks){	if ((tasks == null) || (tasks.size() == 0))		return;	for (Task task : tasks)	{		TaskWrapper tw = new TaskWrapper(task);		timerService.createTimer(0,tw);	}}

The timeout handler retrieves the TaskWrapper and calls its run method:

@Timeoutpublic void timeoutHandler(Timer t){	TaskWrapper tw = (TaskWrapper) t.getInfo();	tw.run();}

The TaskWrapper in turn calls the task’s run method and updates its status:

public void run(){	try {		task.setStatus(Task.Status. Started);		task.run();	}	finally{		task.setStatus(Task.Status.Completed);	}}

Notice that runtime exceptions are not caught in the TaskWrapper’s run method. This allows the container to handle exceptions by rolling back the transaction. It is assumed that the Task’s run method will handle any exceptions not intended to be processed by the container.

Now let’s examine the implementation of the executeAndWait version that blocks until all tasks are completed or the timeout expires. Upon completion, the method must return the set of completed tasks, which may be a subset of all tasks if the timeout expires first. To satisfy this requirement, you create a TaskList class to hold the set of completed tasks. This class also acts as the synchronization monitor that coordinates the wait for task(s) completion and the completion notification:

public class TaskList extends ArrayList implements TaskListener{	public void taskCompleted(Task task)	{		add(task);		synchronized (this)		{			notifyAll();		}	}	public void waitForCompletion(int numTasks, long timeout)	{		long endTime = System.currentTimeMillis() + timeout;		while ((numTasks > size()) && (System.currentTimeMillis() < endTime))			synchronized (this)			{				try				{					wait(endTime - System.currentTimeMillis());				} catch (InterruptedException e)				{					throw new RuntimeException(e);				}			}	}...

The taskCompleted method is called whenever a task is completed. It is responsible for adding the task to the list and notifying the waiting thread. The waitForCompletion thread will wake up every time a task is completed and return as soon as all tasks are completed or the timeout expires, whichever comes first. Since the TaskWrapper handles task execution, you provide it with the TaskList during initialization (via the TaskListener interface) to allow it to perform task completion notification:

public class TaskWrapper implements Serializable {	private Task task;	private TaskListener listener;		public TaskWrapper(Task task, TaskListener listener) {		this.task = task;		this.listener = listener;	}	public void run(){		try {			task.setStatus(Task.Status.Started);			task.run();		}		finally{			task.setStatus(Task.Status.Completed);			if (listener!=null)				listener.taskCompleted(task);		}	}}

You can now implement the concurrency manager’s executeAndWait method by scheduling tasks using the Timer Service (as done previously in execute) and then waiting for task completion using TaskList.waitForCompletion.

One issue that you must address is transaction demarcation. Placing the timer scheduling and waiting in the same transaction may cause the method to hang. This potential deadlock situation is caused by the timers/tasks not being scheduled until the transaction commits, while the transaction is not committed until the wait for task execution is complete. To get around this, you refactor the scheduling logic into a separate transactional method, while making executeAndWait non-transactional:

@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)public Collection executeAndWait(Collection tasks, long timeout){	if ((tasks == null) || (tasks.size() == 0))		return null;	if (timeout < 0)		new IllegalArgumentException("timeout cannot be negative");	TaskList completedTasks = scheduleTasks(tasks);	completedTasks.waitForCompletion(tasks.size(), timeout);	return completedTasks;}private TaskList scheduleTasks(Collection tasks){	TaskList completedTasks = new TaskList();	for (Task task : tasks)	{		TaskWrapper tw = new TaskWrapper(task, completedTasks);		timerService.createTimer(0,tw);	}	return completedTasks;}

The scheduleTasks method defaults to the REQUIRED transaction attribute, while the completedTasks.waitForCompletion call is non-transactional due to the NOT_SUPPORTED attribute of the executeAndWait method. Figure 1 illustrates the interaction between the framework classes.

Click to enlarge

Figure 1. Interaction Between EJB Timer Concurrency Framework Classes

The timeoutHandler and waitForCompletion methods are executed concurrently.Factors to Consider When Using This Approach
The framework implementation section showed how the scheduling of tasks is done in a transactional, atomic, all-or-nothing manner. Be aware, however, that each task execution is performed in a separate transaction. So while you are guaranteed that task scheduling is atomic, the execution is a different matter. This, of course, is the price of executing the tasks in an asynchronous, concurrent manner. Therefore, when scheduling tasks, be sure to set an appropriate timeout to avoid the possibility of the execute method hanging if tasks fail.

As in typical EJB applications, you must exercise proper judgment with regards to exception handling. You must handle the exceptions that the application needs to respond to by placing a try/catch block in the Task’s run method. In the try/catch block, you’ll decide whether to rollback or commit the transaction depending on your application’s requirements. Exceptions that are not handled are propagated to the container, resulting in an automatic transaction rollback.

Since EJB timers are recoverable, tasks must be serialized in case of application server failure. This means that you need to make all relevant task attributes serializable. Conversely, you can make attributes that represent intermediate results transient since they are recalculated when the task is run again.

EJB timers typically invoke the timeout handler in the same VM as the scheduler. Therefore, you should also expect the tasks to be parallelized in the same VM. This is appropriate for the majority of application scenarios. However, in some cases, you may have a very large number of tasks that you prefer to distribute across application cluster members. You can address this in two ways:

  1. Redefine the scheduling so that a smaller number of tasks are scheduled more often. This results in better distribution across the cluster since scheduling calls are essentially SSB invocations that application servers are capable of distributing.
  2. Use the queue-based concurrency approach to parallelize tasks across the cluster. This, however, is a more complex and resource-intensive approach to the J2EE concurrency problem.

Most application servers allow you to tune the EJB Timer’s thread pool size. You should configure the size to be sufficiently large relative to the number of tasks to be scheduled. Otherwise, multiple tasks may be scheduled on a single thread, resulting in serial behavior.

Share the Post:
Share on facebook
Share on twitter
Share on linkedin

Overview

Recent Articles: