Implement Parallel Processing in Your Java Applications

omputing today is moving toward multi-core systems. A multi-core CPU allows the containing computer to exhibit the same thread-level parallelism as a system with multiple CPUs (see Figure 1), which represents a huge benefit in overall system throughput for properly designed applications.

Click to enlarge

Figure 1. A Multi-Core CPU Architecture

With manufacturers such as HP, Sun, IBM, Dell, and Apple offering systems with more than one multi-core CPU?and Azul Systems even releasing a processor with 48 cores?parallel computing is now available for mid-sized to large-scale servers, as well as for everyday desktop computers. So how do you as a developer adapt to this trend and best utilize it in your applications? First, you need to learn and employ new programming models and algorithms, and then find the proper tools and frameworks to effectively build, debug, unit test, and deploy these applications.

However, before getting into the available algorithms, tools, and frameworks, let’s properly define parallel computing.

What is Parallel Computing?
Parallel computing is often confused with concurrency, where multiple individual tasks are executed simultaneously (e.g., the execution of more than one application (or multiple tasks) at the same time on a multi-processor system). Parallel computing, however, is more involved than simply creating multiple threads in your application. It is the execution of one task on multiple processors (or multiple-processor cores) at the same time, where both the processing and the results are highly coordinated.

Editor’s Note: The author, Eric Bruno, was commissioned to write this article by Pervasive Software, makers of DataRush. We have selected this article for publication because we believe it to have objective technical merit.

So while concurrency helps only with systems that perform many tasks simultaneously, such as web and application servers, parallel computing is ideal for applications that require you to break up the execution of a single task into pieces that can be executed in parallel and then combined, resulting in faster overall task processing.

The benefits of parallel computing include higher overall system throughput (an obvious one, since multiple tasks can execute without impacting one another), more efficient utilization of multi-core architectures, the ability to load balance tasks, and the use of other, more abstract, concepts such as pipeline parallelism and partitioning. Some applications that benefit most from parallel computing on multi-core SMP systems are:

  • RFID-based applications that need to read gigabytes of data quickly
  • Scientific and engineering applications that perform complex calculations on terabytes of data
  • Data warehouse applications
  • Distributed data-aggregation applications
  • XML-based applications that involve large transformations
  • Distributed real-time data-aggregation applications

Other benefits are not so obvious, and require OS and tool support. For instance, balancing task processing across processor cores based on individual core load can be beneficial, but it’s tricky. As is pipeline parallelism, a related concept that provides even more benefits to overall performance.

Pipeline Parallelism and Partitioning
A pipeline in computing terms is a series of related steps, where each step depends upon the others. In many cases, however, some of the steps in the pipeline can execute in parallel. The implementation of such an algorithm is called pipeline parallelism. With this implementation, even when one step in the pipeline depends on data from another, both can execute in parallel, at least partially, if the data is streamed between them while the first step is still generating it.

For instance, consider a process that reads data from a database, formats it, and then displays it. In a classic implementation, an application might sequentially submit a query to a database, format the results, and then make appropriate library calls to display the data. Unfortunately, the data can’t be formatted until all it is returned from the database, and it can’t be displayed until formatting is complete (see Figure 2). (The data dependency between subtasks is often called a dataflow, and a dataflow graph represents the interaction of all subtasks within a task.) As a result, the processor remains idle while system data I/O takes place, even on a single-processor system. This inefficiency is magnified on a multi-core system where multiple processor cores remain idle.

Click to enlarge

Figure 2. Inefficient Sequential Task Processing

However, as mentioned previously, pipeline parallelism enables the processing of later steps in a pipeline to occur before previous steps have completely finished. Applying this concept to the previous example, suppose pipeline Step A performs a query that returns many rows from a database, Step B requires this data to do some formatting, and Step C displays the results. Using parallelism, you can stream data to the formatting code as it is read from the database so that formatting can begin before all the data is received (see Figure 3). Further, the code that displays the formatted data can begin its work even before all the data is processed by streaming the formatted data to the display code as it is formatted.

This example ensures that all processors in a multi-core machine are used as much as possible. While one step in the pipeline is still working (or waiting for IO), other steps are actively processing the data received so far. As you increase the parallelism in the pipeline and the number of processor cores in the system, overall system performance and throughput increases.

Click to enlarge

Figure 3. Pipeline Parallelism Breaks a Single Task into Multiple Work Units

Combining pipeline parallelism with data partitioning allows data in multiple tables (and in columns within those rows) to be read or written to simultaneously. This combination further increases system efficiency and performance, and its benefits aren’t limited to database processing. Data from file processing, web service calls, and calls to legacy systems can be parallel processed using the same patterns.

The performance gains from parallel computing aren’t without their challenges, however. Designing and developing software to take advantage of pipeline parallelism and horizontal and vertical partitioning require new algorithms and tools to overcome those challenges. The remainder of the article examines some of the challenges and introduces a tool that addresses them, DataRush from Pervasive Software.

Parallel Processing Challenges
Parallel programming is a discipline that deals with the proper development of parallel-processing applications. Some existing common algorithms and design patterns take into account parallel overhead, task management, resource synchronization, system speedup, scalability, and overall efficiency. The challenges associated with them include:

  • Choosing among the algorithms to implement (core load balancing, pipeline parallelism, vertical and horizontal partitioning, and so on)
  • Balancing the factors involved in system efficiency
  • Understanding the overhead of parallel computing
  • Mastering multi-threaded programming, an advanced and error-prone development chore (For instance, you need to manually synchronize thread access to shared resources, while guarding against thread deadlocks.)
  • Dynamically scaling the amount of parallelism employed on different target machines (based upon the number of cores available, and other resources)
  • Dynamically managing streaming dataflow between tasks based on thread and processor workloads (This usually entails managing inter-task work queues.)
  • Applying Amdahl’s Law to estimate the amount of speedup from parallel processing, using design-time and runtime factors (such as algorithms used and the number of cores)

Additionally, you need to wrestle with these challenges for each parallel-programming problem you face. Pervasive DataRush is a framework that can help.

Introducing Pervasive DataRush
Pervasive DataRush is a Java framework with tools support that allows you to build parallel applications quickly. How? Think of a dataflow, in which data-processing operators (small units of business logic) are linked together through ports to form an application. Operators read data from input ports, perform their processing on that data, and write the results to output ports. DataRush links the operator ports with queues to allow the operators to work in parallel. Pipeline parallelism, as implemented in DataRush, is the ability to stitch independent operators together in an overall dataflow graph where each component is run in its own thread (or set of threads). Being pipelined, the overall graph keeps multiple cores busy as data flows through the queues between each operator. This pipelined nature also allows complex applications to be built from simple parts in a flexible way. Through this implementation, DataRush handles all the challenges listed in the previous section.

The dataflow-based architecture on which DataRush is built combines the three algorithms of horizontal partitioning, vertical partitioning, and pipeline parallelism. So DataRush supports both implicit parallelism (using the best algorithms for the problem being solved) and explicit parallelism (controling which aspects of a task can be made parallel).

Click to enlarge

Figure 4. DataRush Component Diagram

DataRush applications contain three main component types (see the component diagram in Figure 4):

  1. Process: this basic unit of dataflow programming in DataRush is a single-threaded scalar operator that implements some business logic. A process is analogous to a subtask as described in the previous pipeline example.
  2. Assembly: itself an operator, an assembly is a multi-threaded composite of other operators and customizers.
  3. Customizer: this configuration property or subcomponent dynamically controls or extends an assembly.

The relationship between these components in a DataRush application forms a tree where a node is an assembly and a leaf (end-node) is either a process or a customizer (see Figure 5).

Click to enlarge

Figure 5. DataRush Component Tree
Click to enlarge

Figure 6. Ports for DataRush Operators

Operators define ports for both data input and output, and communicate with one another through links between ports. For instance, the output port of Operator A can be linked to the input port of Operator B (see Figure 6). There are two types of ports. The first, a scalar port, is equivalent to a column of data where all the data is of one type. The second, a composite port, is an aggregate of scalar ports (subports), analogous to a row containing columns of data.

Click to enlarge

Figure 7. A Directed Acyclic Graph

For efficiency?and to maintain data ordering?DataRush creates one queue per output port, with only one writer and zero to many readers. An operator port can also be linked to an external data source or target, such as a relational database or a file system. The linking of two operators through their ports defines a simple dataflow. Dataflows get more complex as more operators are linked to one another. The resulting dataflow forms a directed acyclic graph (see Figure 7).

The proper use of ports helps DataRush to achieve further parallelism because it allows columns of data to be delivered independently, even if the data is retrieved from different sources. Contrast this with a database that returns entire rows of data, or an object that must populate all its member variables before you can use it. In these examples, your code must wait until all the data is retrieved, either as a row or an object, before it can process any of the data. DataRush, however, allows ports to deliver their data in parallel to other ports so that your code can process data as it becomes available, without waiting or blocking.

Component Development
Operators are components (written in Java or XML) that implement business logic and process data. Assemblies, which define and link operators, are defined with Dataflow XML (DFXML), also called an assembly specification. The combination of assembly definitions, properties, constraints, and operators linked through ports forms the basis of a DataRush parallel-processing application. The DFXML includes sections with XML tags such as , , , , and others (see Listing 1).

To create a complete DataRush application, you compile the appropriate operator Java files and assemble the DFXML files with the DataRush Assembler (dfa). The application is launched with the DataRush Engine (dfe), which takes the dataflow binaries and Java class files and begins the data processing execution. Figure 8 summarizes the entire development, build, and execution process.

Click to enlarge

Figure 8. The Development Cycle for a DataRush Application

An assembly specification (a df.xml file) becomes an assembly when it’s compiled with the DataRush Assembler tool, dfa. The artifact is a binary .df file (analogous to a Java .class file), which defines an assembly with operators, which can be executed. You can set properties that were defined in the assembly specification either with default values at compile time or through properties file (a text file with name-value pairs)at runtime. You specify a properties file to use with the optional dfe command line argument, -pf .

Just as properties allow you to adjust the runtime characteristics of an operator, DataRush customizers allow you to modify and refine an assembly, even after the dfa tool has compiled its specification. For instance, you can alter an application’s behavior based on its environment, such as the number of fields or columns in the input data (vertical partitioning), dynamic Java typing based on port data type, and stricter typing based on output port type at runtime. The last item is similar to Java Generics, where an operator port type is defined to be generic, but at runtime a more specific type is declared and enforced.

A customizer class implements the com.pervasive.dataflow.dev.DataflowCustomizer interface, which contains a single method named customize. Since customizers are specific to the assembly they customize, they tend not to be reusable. The sample customizer class in Listing 2, SelectRowsCompositionCustomizer, is named after the assembly it customizes.

In this case, the input port specification is changed from “generic” to “scalar” or “composite” based upon the type of the output port to which it is linked. The DataFlow Assembler calls the customize method to modify the assembly specification before it’s compiled.

If you want to migrate to DataRush from existing code, rewriting the entire application to conform to the DataRush specifications may not be feasible. For this reason, DataRush offers two ways to integrate it into existing applications:

  1. Define and build a separate DataRush application, and then invoke the DataRush Engine command line tool from an external application.
  2. Embed the actual DataRush APIs into an external application, and then write Java code to call portions of it to invoke assemblies directly.

Although the first method is less intrusive, and it removes DataRush dependencies from your Java code, it’s also not as robust. For instance, the second method allows you to write all your code in Java, it gives you better control over dataflows and logic control flow, and it provides access to the reporting, logging, and error handling of the DataRush assembly execution.

You also can integrate DataRush applications with other enterprise systems such as messaging servers or enterprise service bus (ESB) applications. For instance, service components running in an ESB can offload the processing of large volumes of data to a DataRush application on a separate, multi-core server to achieve the parallel processing required. The DataRush application can then signal the ESB service (via a message) when complete, and provide it with the results. As a result, ESB message traffic is reduced (the data to be processed is not propagated across it), DataRush performs the parallel computation of the data for maximum efficiency, and the asynchronous nature of the ESB allows other work to continue in the system, thereby creating further parallelism.

The DataRush Libraries and Tool Support
DataRush (the libraries, tools, APIs, and the engine itself) is available as a set of JAR files. In fact, this is how DataRush supports application embedding. It also allows you to easily package and deploy DataRush applications to servers in your production environment. Supported environments include Windows, Solaris (x86 and SPARC), Linux, HP-UX, and IBM AIX. (Author Note: Although it’s not officially supported, DataRush installed and ran perfectly well on my Intel-based Mac as well.)

DataRush includes a library of precompiled operators that you can use when creating your own operators and assemblies. Operators exist to perform data reads and writes on flat files, XML, and relational databases, along with generic logic processing. These operators serve as building blocks for you to reuse, and they reduce the need to implement these common tasks.

Click to enlarge

Figure 9. DataRush Integrates with Eclipse

In terms of application development, DataRush comes with an Eclipse plug-in that works with the Eclipse IDE as well as the Eclipse Graphical Editing Framework (GFE). The end result is support for DataRush-specific projects with visual modeling and editing of parallel processing tasks (see Figure 9).

You can build, run, and test outside of Eclipse as well, as DataRush integrates with command-line tools such as Ant and unit-test frameworks such as JUnit. DataRush-specific Ant tasks to build and test custom DataRush applications are included in a JAR file. This allows you to automate all aspects of your application builds, including the execution of DataRush test suites.

A Sample DataRush Application
DataRush comes with multiple sample applications to get you started. This section discusses the “New Fields” sample application, as it contains both XML and Java operators. The application loads data from an input file (specified as a property) and reads three fields that make up a simulated sale: a date, a dollar amount, and a product ID. This portion of the processing is completely specified in the DFXML assemblies. The Java portion is an operator that computes the day of the week based upon a date as input. The output of this operator is combined with the sales data, along with a new record identifier generated at processing time, and then written to an output file.

Listing 3 contains the complete assembly definition DFXML file for this application. At the top of the file, properties are defined that control overall processing. Some of the important properties are:

  • inputFileName: the name and path to read the incomplete sales data
  • outputFileName: the name and path to write the completed sales data (with fields added)
  • startRowID: a starting identifier for new rows written to the output file
  • fieldSeparator: the delimiter character or string used as a separator in the input file

The next section in the assembly specification describes the individual assembly operators and one process for the Java object. Some of these are:

  • read (operator): uses the ReadDelimitedText operator in the DataRush operator library to read the input file
  • genRowID (operator): uses the GeneratedArithmeticSequence operator to generate unique output row identifiers
  • dayOfWeek (process and operator): defines a custom operator, implemented as a Java class, that specifies a DATE as its input and an integer as its output, which is linked to the read operator’s input

The remaining sections of the specification link operator output and input ports, thereby defining a complete application dataflow.

Build the Sample Application
Building the application is a two-step process, but you can combine both steps via an Ant script or an Eclipse project. First, from within the DataRush samples directory, compile the Java code with the following command:

> javac -d build/classes -classpath ../dfre/lib/dfreapi.jar src/example/newfields/DayOfWeekProcess.java

Next, run the DataRush Assembler on the assembly specification:

> dfa -d build/classes -sp src src/example/newfields/NewFieldsTextFile.df.xml

Although the specification is split across two .df.xml files, both are assembled into binaries because one references the other.

Run the Sample Application
After a successful application build, you can run the application with the DataRush Engine. You must include a properties file that specifies the path to the input and output files (included with the sample). This is done with the following command:

> dfe -cp build/classes -pf newfields.properties example.newfields.NewFieldsTextFile

When executed, the sample application will write its data to the output file, NewFieldsSampleOutput.txt (see Listing 4).

The Results
To experience firsthand the benefits of using DataRush and the pipeline parallelism it employs, I ran this sample on both a 1.83 GHz Core Duo processor (dual-core) system and a single core (but faster) 3GHz Pentium 4 system. The application completed in 2.011 seconds on the dual-core machine, while it took 5.1 seconds on the Pentium 4. I was impressed by how even a simple application gained much higher throughput on a multi-core machine.

Parallel Computing Comes to the Development Process
Just as multi-core computing has brought affordable parallel-processing computers to the fore, Pervasive DataRush has brought parallel computing to the Java community in a comprehensive, easy-to-use package. DataRush solves the complex problems associated with developing applications that utilize multi-CPU, symmetrical multi-processing systems, and it comes in a reusable form that you can leverage in all your applications without rewriting all your code.

Share the Post:
Share on facebook
Share on twitter
Share on linkedin

Overview

Recent Articles: