Browse DevX
Sign up for e-mail newsletters from DevX


Leveraging EJB Timers for J2EE Concurrency : Page 3

Improve the performance of your J2EE applications by using a concurrency approach that takes advantage of the inherent thread pool management of EJB timers.




Building the Right Environment to Support AI, Machine Learning and Deep Learning

Framework Implementation Details
If you are mainly interested in using the EJB Timer-based concurrency framework for concurrency and don't want to delve into the implementation details, just skip to the next section! The code download provides the necessary EAR files.

The framework uses the EJB Timer Service to create a timer that triggers immediately (zero expiration). Once the timeout handler is called, the framework executes the task and updates its status. As described earlier, the SSB EJBTimerConcurrencyManager implements the ConcurrencyManager interface:

@Stateless @Local public class EJBTimerConcurrencyManager implements ConcurrencyManager { private @Resource TimerService timerService; ....

The Timer Service is injected into the bean using the EJB3 @Resource annotation. To schedule a task, you access timerService and create a timer with the task wrapper as a parameter. You submit a task wrapper instead of the task itself to perform functions such as updating the task status and waiting on task completion (covered later in the blocking discussion):

public void execute(Collection<Task> tasks) { if ((tasks == null) || (tasks.size() == 0)) return; for (Task task : tasks) { TaskWrapper tw = new TaskWrapper(task); timerService.createTimer(0,tw); } }

The timeout handler retrieves the TaskWrapper and calls its run method:

@Timeout public void timeoutHandler(Timer t) { TaskWrapper tw = (TaskWrapper) t.getInfo(); tw.run(); }

The TaskWrapper in turn calls the task's run method and updates its status:

public void run(){ try { task.setStatus(Task.Status. Started); task.run(); } finally{ task.setStatus(Task.Status.Completed); } }

Notice that runtime exceptions are not caught in the TaskWrapper's run method. This allows the container to handle exceptions by rolling back the transaction. It is assumed that the Task's run method will handle any exceptions not intended to be processed by the container.

Now let's examine the implementation of the executeAndWait version that blocks until all tasks are completed or the timeout expires. Upon completion, the method must return the set of completed tasks, which may be a subset of all tasks if the timeout expires first. To satisfy this requirement, you create a TaskList class to hold the set of completed tasks. This class also acts as the synchronization monitor that coordinates the wait for task(s) completion and the completion notification:

public class TaskList extends ArrayList<Task> implements TaskListener { public void taskCompleted(Task task) { add(task); synchronized (this) { notifyAll(); } } public void waitForCompletion(int numTasks, long timeout) { long endTime = System.currentTimeMillis() + timeout; while ((numTasks > size()) && (System.currentTimeMillis() < endTime)) synchronized (this) { try { wait(endTime - System.currentTimeMillis()); } catch (InterruptedException e) { throw new RuntimeException(e); } } } ...

The taskCompleted method is called whenever a task is completed. It is responsible for adding the task to the list and notifying the waiting thread. The waitForCompletion thread will wake up every time a task is completed and return as soon as all tasks are completed or the timeout expires, whichever comes first. Since the TaskWrapper handles task execution, you provide it with the TaskList during initialization (via the TaskListener interface) to allow it to perform task completion notification:

public class TaskWrapper implements Serializable { private Task task; private TaskListener listener; public TaskWrapper(Task task, TaskListener listener) { this.task = task; this.listener = listener; } public void run(){ try { task.setStatus(Task.Status.Started); task.run(); } finally{ task.setStatus(Task.Status.Completed); if (listener!=null) listener.taskCompleted(task); } } }

You can now implement the concurrency manager's executeAndWait method by scheduling tasks using the Timer Service (as done previously in execute) and then waiting for task completion using TaskList.waitForCompletion.

One issue that you must address is transaction demarcation. Placing the timer scheduling and waiting in the same transaction may cause the method to hang. This potential deadlock situation is caused by the timers/tasks not being scheduled until the transaction commits, while the transaction is not committed until the wait for task execution is complete. To get around this, you refactor the scheduling logic into a separate transactional method, while making executeAndWait non-transactional:

@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) public Collection<Task> executeAndWait(Collection<Task> tasks, long timeout) { if ((tasks == null) || (tasks.size() == 0)) return null; if (timeout < 0) new IllegalArgumentException("timeout cannot be negative"); TaskList completedTasks = scheduleTasks(tasks); completedTasks.waitForCompletion(tasks.size(), timeout); return completedTasks; } private TaskList scheduleTasks(Collection<Task> tasks) { TaskList completedTasks = new TaskList(); for (Task task : tasks) { TaskWrapper tw = new TaskWrapper(task, completedTasks); timerService.createTimer(0,tw); } return completedTasks; }

The scheduleTasks method defaults to the REQUIRED transaction attribute, while the completedTasks.waitForCompletion call is non-transactional due to the NOT_SUPPORTED attribute of the executeAndWait method. Figure 1 illustrates the interaction between the framework classes.

Click to enlarge

Figure 1. Interaction Between EJB Timer Concurrency Framework Classes

The timeoutHandler and waitForCompletion methods are executed concurrently.

Thanks for your registration, follow us on our social networks to keep up-to-date