Browse DevX
Sign up for e-mail newsletters from DevX


Minding the Queue: Java 1.5 Adds a New Data Structure Interface  : Page 3

The release of Java 1.5 finally provides built-in support for one of the most fundamental data structures in programming—the queue. This article explores the new Queue interface that's been added to the java.util package, demonstrating how to use this new support to streamline your data handling.




Building the Right Environment to Support AI, Machine Learning and Deep Learning

Blocking Queues
Queues are typically restricted to a given size. With the queue implementations you have seen so far, using the offer or add methods to enqueue (and the remove or poll methods to dequeue) assumes that if the queue cannot accommodate an enquing or dequeuing operation, then you don't need to wait to execute the program. The java.util.concurrent.BlockingQueue interface codifies this concept. It adds the methods put and take to the arsenal. An example might be useful.

Using the classis classic producer/consumer relationship, suppose your producer writes to a queue (more specifically a BlockingQueue). You have a number of consumers reading from that queue, which you want to happen in an orderly fashion. Basically, each consumer needs to wait for the previous consumer before it is allowed to grab an item from the queue. To architect this setup programmatically, spawn one producer thread that writes to a queue and then spawn a number of consumer threads which read from the same queue. Note here that the threads will block one another until the current thread is done grabbing an item from the queue.

The code below shows the class Producer writing to a BlockingQueue. Notice that objects in the run method (which you're obligated to implement, since you extended Thread) are put onto the BlockingQueue after waiting a random amount of time (ranging from 100 – 500 millseconds). The object put onto the queue is simply a String object consisting of the timestamp when the message was produced.

The actual work of enqueuing the object is done by the statement:

blockingQueue.put("Enqueued at: " + time)

The put method throws an InterruptedException. Accordingly, the put operation is surrounded by a try and a catch block, which catches the exception thrown (see Listing 1).

Leeching messages from the producer is the Consumer object, which again extends from the Thread object and is therefore obligated to implement the run method (see Listing 2).

The Consumer class is very similar in the design to the Producer class. Instead of putting messages on the BlockingQueue, the Consumer class uses the take method to pull (i.e., dequeue) messages off of the queue. As stated before, this happens by waiting until something is actually on the queue. If the Producer thread stopped putting (i.e.,queuing) objects onto the queue, then the Consumer threads would wait until a queue item became available. The class TestBlockingQueue, shown below, spawns four consumer threads which try and grab objects from the BlockingQueue:

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class TestBlockingQueue { public static void main(String args[]) { BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(); Producer producer = new Producer(blockingQueue, System.out); Consumer consumerA = new Consumer("ConsumerA", blockingQueue, System.out); Consumer consumerB = new Consumer("ConsumerB", blockingQueue, System.out); Consumer consumerC = new Consumer("ConsumerC", blockingQueue, System.out); Consumer consumerD = new Consumer("ConsumerD", blockingQueue, System.out); producer.start(); consumerA.start(); consumerB.start(); consumerC.start(); consumerD.start(); } }

Figure 1. Consumer Threads: These threads dequeue messages from the BlockingQueue in the order that you spawned them.

The following line creates the Blocking Queue:

BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();

Notice it uses the LinkedBlockingQueue implementation of the BlockingQueue. You can't instantiate the BlockingQueue directly, since it is an abstract class. You could also have used the queue type ArrayBlockingQueue. ArrayBlockingQueue uses an array as its storage device, while the LinkedBlockingQueue uses a linked list. The capacity of an ArrayBlockingQueue is fixed. For a LinkedBlockingQueue, the maximum size can be specified; by default it is unbounded. The sample code[link] takes the unbounded approach.

During the execution of the class, the dequeuing of objects from the queue happens in sequential order (see example execution below). In essence, one consumer thread blocks the others from accessing the BlockingQueue, until it is able to dequeue an object off of the queue.

Comment and Contribute






(Maximum characters: 1200). You have 1200 characters left.



Thanks for your registration, follow us on our social networks to keep up-to-date