Building Multithreaded Java Applications

ost server-side applications require the ability to process tasks concurrently, which improves performance and increases utilization of hardware resources. In early versions of Java (1.4 or earlier), developers needed to implement concurrent applications?including thread pool logic?themselves using low-level language constructs and the Java Thread API. The results were often poor. The nature of the Java Thread API often led unwitting programmers to develop code that introduced hard-to-debug programming errors.

In Java 5.0, Sun introduced the Java concurrency utilities (JSR-166) to address these issues and provide a standard set of APIs to create concurrent applications. This article explores some of the features provided by the Java concurrency package and demonstrates techniques for writing concurrent applications using these utilities.

Challenges of Concurrent Programming

Since its inception, Java has provided the Thread class and low-level language constructs such as synchronized and volatile for developing platform-independent concurrent applications. However, building concurrent applications with these features has never been easy. Developers faced the following challenges:

  • Incorrect programming can lead to deadlocks, where two or more threads waiting for each other are blocked forever.
  • No mechanism is available for writing wait-free, lock-free algorithms in the Java language. Developers must use native code.
  • Developers have to write their own complex thread pool logic, which can be tricky and prone to error.

Brief Overview of Java Concurrency Utilities

JSR-166 (Java concurrency utilities), which is part of Java 5.0, greatly simplifies the development of concurrent applications in Java by focusing on breadth and providing critical functionality that is useful across a wide range of concurrent programming styles.

Java concurrency utilities provides multiple features that developers can leverage for faster, more predictable development of concurrent applications. These features free developers from reinventing the wheel by writing custom code. Some of JSR-166’s most notable features are:

  1. Standard interfaces and frameworks for defining custom thread-like sub systems
  2. A mechanism for standardizing invocation, scheduling, execution, and control of asynchronous tasks, according to a set of execution policies in the Executor framework
  3. A mechanism for thread coordination through classes such as semaphore, mutexe, barrier, latche, and exchangers
  4. Non-blocking FIFO queue implementation (the ConcurrentLinkedQueue class) for scalable, efficient thread-safe operations
  5. Blocking queue implementation classes to cover the most common usage contexts for the producer/consumer approach, messaging, parallel tasking, and related concurrent designs
  6. A framework for locking and waiting for conditions that is distinct from the built-in synchronization and monitors
  7. Ready-to-use classes for lock-free, thread-safe programming on single variables

Developing a Concurrent Java Application

The sections to follow demonstrate how to use the Java concurrency utility API to develop a multithreaded e-commerce application that processes orders placed online. After the application validates and authorizes orders, it places them in the order-processing queue (java.util.concurrent.BlockingQueue). A pool of order-processor threads polls the order queue continuously and processes the orders when they become available.

Decoupling the application’s order-processing code provides the flexibility to increase or decrease the order-processing rate by changing the thread pool size. Putting order objects in a concurrent BlockingQueue ensures that a particular order is processed by only one processor, and it takes care of synchronization automatically.

The code snippets in the upcoming sections are taken from the application source code that accompanies this article.

Extending ThreadPoolExecutor

The Executor interface defines only one method and decouples task submission from how the task will be run. The ExecutorService sub-interface defines additional methods for submitting and tracking asynchronous tasks, as well as shutting down the thread pool. The ThreadPoolExecutor class is a concrete implementation of the ExecutorService interface, which should be sufficient for most of the order-processing application’s requirements.

ThreadPoolExecutor also provides useful hookup methods (e.g., beforeExecute), which can be overridden for customization purposes. The CustomThreadPoolExecutor class in Listing 1 extends the ThreadPoolExecutor class and overrides the beforeExecute, afterExecute, and terminated methods.

@Overridepublic void afterExecute(Runnable r, Throwable t) {  super.afterExecute(r, t);  Logger.log("After calling afterExecute() method for a thread "      + r);}@Overridepublic void terminated() {  super.terminated();  Logger.log("Threadpool terminated");}

The overridden methods simply log the messages, but in a real-world scenario, they can do useful things. For example, the terminated method can send an alert that all the threads in this pool are dead. To properly nest the multiple overrides, you should generally call the superclass’s overridden methods from the respective methods in the subclass.

The Java concurrency API also provides the ScheduledExecutorService interface, which extends ExecutorServiceInterface and can schedule the tasks to run after a particular delay or to execute periodically. ScheduledExecutorThreadPool is a concrete implementation of this interface.

Defining Asynchronous Task Implementation

The executor executes submitted asynchronous tasks, which perform the actual business logic. To submit a task to the executor, the ExecutorService interface provides overloaded submit methods, which accept Runnable or Callable object types.

The Runnable task type is useful in cases where:

  • The task does not need to return any result on completion.
  • There is no need to throw an application-specific checked exception if the run() method encounters an exception.
  • Porting the existing legacy classes that implement the Runnable interface is required.

The Callable task type provides more flexibility and provides the following advantages:

  • The task can return user-defined objects as results.
  • The task can throw user-defined checked exceptions.

You need to implement run() and call() methods for Runnable and Callable task types, respectively.

The OrderProcessorCallable class in Listing 2 implements the Callable interface and specifies an Integer as the result object. The constructor takes the task object name and the BlockingQueue for retrieving the orders to be processed. The call() method keeps on polling the BlockingQueue for order value objects and processes any it finds. If there is no order to process, the call() method sleeps for some time and polls it again.

The infinite loop in the call method is useful in this application scenario because there is no need to create and submit new tasks to the ThreadPoolExecutor again and again for each order object.

public Integer call() throws OrderProcessingException {	while (running) {	  // check if current Thread is interrupted	  checkInterruptStatus();		// poll for OrderVO from blocking queue and do            // order processing here		...  }	// return result	return processedCount;}

Notice that the asynchronous task implementation runs continuously for the application’s lifetime. In most applications, however, asynchronous task implementation performs its required operation and returns immediately.

Handling Thread Interrupts

The call method implementation uses the checkInterruptStatus method to perform frequent checks on executing thread interruptions. This is required because to force task cancellation, ThreadPoolExecutor sends an interrupt to the thread. Failing to check interrupt status could result in that particular thread never returning. The following checkInterruptStatus method checks the interrupt statuses of running threads and throws OrderProcessingException if a thread is interrupted:

private void checkInterruptStatus() throws    OrderProcessingException {  if (Thread.interrupted()) {    throw new OrderProcessingException("Thread was interrupted");  }}

For task implementations, it is generally a good practice to throw an exception and terminate task execution when a running thread is interrupted. However, based on the order-processing application’s requirements, ignoring the thread interrupt may be appropriate in this case.

Creating a Thread Pool Executor

Creating a ThreadPoolExecutor is straightforward. You just need to call the CustomThreadPoolExecutor class constructor and pass the appropriate configuration parameters. The following code snippet creates a fixed-size thread pool by defining the same value for the number of core threads and the maximum number of threads:

private ThreadPoolExecutor executor;public OrderProcessorMain() {  // create a thread pool with fixed number of threads  executor = new CustomThreadPoolExecutor(THREAD_POOL_SIZE,      THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,      new LinkedBlockingQueue());  ..}  

The timeout value of 0L makes sure the worker thread should not time out due to inactivity. The last argument is the blocking queue object for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.

Alternatively, you could create a fixed thread pool using the java.uti.concurrent.Executors utility class.

ExecutorService executorService = Executors.newFixedThreadPool(2);

This method implementation returns the ThreadPoolExecutor object with a fixed-core and max-pool thread size.

Creating and Submitting Tasks

After creating ThreadPoolExecutor, you need to create tasks for processing orders. You create these tasks using the OrderProcessorCallable class created in the previous section. The OrderProcessorCallable constructor takes a task name and an order queue object for retrieving the OrderVO object.

// create Callable taskOrderProcessorCallable callable1 = new OrderProcessorCallable(      "OrderProcessor_1", orderVOQueue);

Remember that the call method of the OrderProcessorCallable class does not return until the running variable is true or the code throws an exception.

The next step is to store a reference to the callable object. This allows you can to call the setRunning method to update the running variable value and make the call method return gracefully. An advantage to this technique is you can call other methods to get object state information, such as the number of orders that have been processed at a certain point in time.

// store reference to callable object in collectioncallableMap.put(callable1.getThreadName(), callable1);

The above code is useful because the Future object returned by the ExecutorService.submit method cannot be used for getting reference to the Callable object or for calling any method on the Callable object.

To execute the OrderProcessorCallable task, you call the submit method and pass a task reference. This method returns an object of type Future, which you can use for the following purposes:

  • Checking task status
  • Getting a result object returned by the Callable.call() method
  • Canceling the task
  • // submit callable tasksFuture future;future = executor.submit(callable1);futurList.add(future);

The Future object is also stored in another collection for checking task status and retrieving the processing result. If you do not want to explicitly store Future objects in a collection, you can use the ExecutorCompletionService utility class. It provides a useful method for retrieving and removing completed tasks from its internal task queue.

Keeping Track of Task Progress

To check the number of orders processed by a task, you use the OrderProcessorCallable object reference stored in the collection. The following code prints the task status at intervals of 1000ms until the orderVOQueue is empty:

private void printProcessorStatus() throws InterruptedException {  // print processor status until all orders are processed  while (!orderVOQueue.isEmpty()) {    for (Map.Entry e : callableMap        .entrySet()) {      Logger.log(e.getKey() + " processed order count: "          + e.getValue().getProcessedCount());    }    Thread.sleep(1000);  }}

Shutting Down ThreadPoolExecutor and Tasks Gracefully

The ExecutorService.shutdown() method can be used for shutting down the executor. When you call shutdown(), the executor initiates an orderly shutdown of all previously submitted tasks, and you may no longer submit new tasks. The following code calls the shutdown() method so that no new tasks can be submitted. After that, it updates the running status of orderCallable to false, which will cause the call method to return.

// shutdown() method will mark the thread pool shutdown to trueexecutor.shutdown();  // mark order processor callable to return for (Map.Entry orderProcessor :   callableMap.entrySet()) {  orderProcessor.getValue().setRunning(false);}

Forcing the ThreadPoolExecutor and Tasks to Shut Down

You can call the ExecutorService.shutdownNow() method to force the executor to shut down. Again, after calling this method, you may no longer submit new tasks. It halts the processing of waiting tasks and attempts to stop actively running tasks by sending an interrupt. ExecutorService.shutdownNow() also returns the list of tasks that were awaiting execution. Note that if actively running tasks failed to respond to Thread.interrupt(), the call may run forever.

List notExecutedTasks = executor.shutdownNow();

You can cancel individual tasks by calling the Future.cancel(boolean mayInterruptIfRunning) method. The true value means an interrupt will be issued for an already executing task. If the task has not yet started, it will not be run. You can check a task’s cancellation status using Future.isCancelled(), which returns true if the task was cancelled before it completed normally.

Exception Handling and Task Result

You can retrieve the task result using either the Future.get() or the Future.get(long timeout, TimeUnit unit) method. The no-argument method blocks other tasks until the task completes by normal execution, by cancellation, or by throwing an exception. Use this method with caution, because it will wait indefinitely for a task to complete. The Future.get() method with a timeout parameter is more useful because it will cause a method to return after a specified period of time:

for (Future f : futurList) {  try {    Integer result = f.get(1000, TimeUnit.MILLISECONDS);    Logger.log(f + " result. Processed orders " + result);  } catch (InterruptedException e) {    Logger.error(e.getMessage(), e);  } catch (ExecutionException e) {    Logger.error(e.getCause().getMessage(), e);  } catch (TimeoutException e) {    Logger.error(e.getMessage(), e);  } catch (CancellationException e) {    Logger.error(e.getMessage(), e);  }  // to avoid printing completed tasks, you may want to remove  // the completed task from futureList here}

The Future.get() method throws different exceptions to give an exact reason for each task failure:

  • InterruptedException is thrown if a thread is interrupted while waiting for computation results.
  • TimeOutException is thrown if a result cannot be retrieved in the specified period of time.
  • CancellationException is thrown if computation is cancelled.
  • ExecutionException is thrown if a task computation fails by throwing any exception, including runtime exceptions. This ensures that the executor threads will not be terminated due to an exception in task execution. The side effect of this behavior is that in application scenarios where you want to propagate a task runtime exception, you need to retrieve the runtime exception from ExecutionException and throw it again.

Putting the Pieces Together

The OrderProcessorMain class (see Listing 3) uses all the concepts discussed in the previous sections. In particular, the main() method performs the following steps:

  1. Creates and configures the CustomThreadPoolExecutor object
  2. Creates and submits the OrderProcessorCallable tasks
  3. Populates the order-blocking queue with OrderVO objects
  4. Prints processor status at regular intervals until the order-blocking queue is empty
  5. Shuts down the thread pool executor
  6. Prints the task result after task completion

To see the sample application in action, just run the OrderProcessorMain.main(String[] args) method. For convenience, the source code download provided with this article can be imported as an Eclipse project.

Share the Post:
Share on facebook
Share on twitter
Share on linkedin

Overview

Recent Articles: