Question:
I am new to Java and am currently trying to
associate idioms and techniques from other
languages/environments with like capabilities in
Java. Specifically, I am trying to determine how
to perform inter-thread and inter-process
communication. How does one pass data, in the
form of an Object or Object reference, between
two threads?
Answer:
Assume that the Object(data) being
communicated is private to the sending class/package
and not within the scope of the receiving class/package,
although the definition of the Object being passed
is known to the receiving class/package.
Can this type of communication be performed
using Piped Streams?
One advantage of threads over processes is
that because threads share an address space, shared
memory communication is fairly easy to set up, provided
you synchronize access to shared data.
Java also provides the classes PipedInputStream and
PipedOutputStream specifically for message passing
style communication between threads. In the example
below, I create a pipeline of four threads:
generator ->
filter -> amplifier -> accumulator
Each arrow (->) represents a
PipedOutputStream to
the thread on the left, and a
PipedInputStream to
the thread on the right.
Generator fills its
output pipe with integers from 1 to 100.
Filter
reads from its input stream, and places only even
inputs into its output stream.
Amplifier reads
numbers from its input stream, and places their
squares in its output stream. Finally,
accumulator
accumulates the sum of all inputs from its input
stream.
*/
import java.io.*;
// Generators fill output pipe with ints from first to last
class Generator extends Thread {
private DataOutputStream dout;
private int first, last;
public Generator(PipedOutputStream out, int start, int end) {
// filter pipes into data streams
dout = new DataOutputStream(out);
first = start;
last = end;
}
public void run() {
for(int i = first; i <= last; i++) {
try {
dout.writeInt(i);
sleep(5); // don't be selfish
}
catch(IOException e) {}
catch(InterruptedException e){}
}
}
} // Generator
// Accumulators sum all ints in input pipe
class Accumulator extends Thread {
private DataInputStream din;
private int accum = 0;
public Accumulator(PipedInputStream in) {
// filter pipes into data streams
din = new DataInputStream(in);
}
public void run() {
boolean done = false;
while(!done) {
try {
accum += din.readInt();
sleep(5);
}
catch(IOException e) { done = true; }
catch(InterruptedException e) {}
}
System.out.println("Result = " + accum); // for now
}
} // Accumulator
// Filters put even ints from input stream into
// output stream (i.e. they filter out odds)
class Filter extends Thread {
private DataInputStream din;
private DataOutputStream dout;
private int next;
public Filter(PipedInputStream in, PipedOutputStream out) {
// filter pipes into data streams
din = new DataInputStream(in);
dout = new DataOutputStream(out);
}
public void run() {
boolean done = false;
while(!done) {
try {
next = din.readInt();
if (next % 2 == 0)
dout.writeInt(next);
sleep(5);
}
catch(IOException e) { done = true; }
catch(InterruptedException e) {}
}
}
} // Filter
// Amplifiers put squares of ints from input stream
// into output stream
class Amplifier extends Thread {
private DataInputStream din;
private DataOutputStream dout;
private int next;
public Amplifier(PipedInputStream in, PipedOutputStream out) {
din = new DataInputStream(in);
dout = new DataOutputStream(out);
}
public void run() {
boolean done = false;
while(!done) {
try {
next = din.readInt();
dout.writeInt(next * next);
sleep(5);
}
catch(IOException e) { done = true; }
catch(InterruptedException e) {}
}
}
} // Amplifier
// computes sum of even squares of ints from 1 to 100
// = 2^2 + 4^2 + ... + 98^2 + 100^2
public class SumOfEvenSquares {
public static void main(String args[]) {
try {
// make some pipes
PipedOutputStream pipe1 = new PipedOutputStream();
PipedInputStream pipe2 = new PipedInputStream(pipe1);
//try { pipe1.connect(pipe2); } catch(IOException e) {}
PipedOutputStream pipe3 = new PipedOutputStream();
PipedInputStream pipe4 = new PipedInputStream(pipe3);
//try { pipe3.connect(pipe4); } catch(IOException e) {}
PipedOutputStream pipe5 = new PipedOutputStream();
PipedInputStream pipe6 = new PipedInputStream(pipe5);
//try { pipe5.connect(pipe6); } catch(IOException e) {}
// make some threads
Generator g = new Generator(pipe1, 1, 10);
Filter f = new Filter(pipe2, pipe3);
Amplifier a = new Amplifier(pipe4, pipe5);
Accumulator ac = new Accumulator(pipe6);
// start threads
g.start();
f.start();
a.start();
ac.start();
// close pipes
pipe1.close();
pipe2.close();
pipe3.close();
pipe4.close();
pipe5.close();
pipe6.close();
}
catch(IOException e) {}
}
} // SumOfEvenSquares