public class Parallel
extends java.lang.Object
As a simple example, consider the following function that squares floats in one array and stores the results in a second array.
static void sqr(float[] a, float[] b) {
int n = a.length;
for (int i=0; i<n; ++i)
b[i] = a[i]*a[i];
}
A serial version of a similar function for 2D arrays is:
static void sqrSerial(float[][] a, float[][] b)
{
int n = a.length;
for (int i=0; i<n; ++i) {
sqr(a[i],b[i]);
}
Using this class, the parallel version for 2D arrays is:
static void sqrParallel(final float[][] a, final float[][] b) {
int n = a.length;
Parallel.loop(n,new Parallel.LoopInt() {
public void compute(int i) {
sqr(a[i],b[i]);
}
});
}
In the parallel version, the method compute
defined by the
interface LoopInt
will be called n times for different
indices i in the range [0,n-1]. The order of indices is both
indeterminant and irrelevant because the computation for each
index i is independent. The arrays a and b are declared final
as required for use in the implementation of LoopInt
.
Note: because the method loop
and interface LoopInt
are static members of this class, we can omit the class name prefix
Parallel
if we first import these names with
import static edu.mines.jtk.util.Parallel.*;
A similar method facilitates tasks that reduce a sequence of indexed
values to one or more values. For example, given the following method:
static float sum(float[] a) {
int n = a.length;
float s = 0.0f;
for (int i=0; i<n; ++i)
s += a[i];
return s;
}
serial and parallel versions for 2D arrays may be written as:
static float sumSerial(float[][] a) {
int n = a.length;
float s = 0.0f;
for (int i=0; i<n; ++i)
s += sum(a[i]);
return s;
}
and
static float sumParallel(final float[][] a) {
int n = a.length;
return Parallel.reduce(n,new Parallel.ReduceInt<Float>() {
public Float compute(int i) {
return sum(a[i]);
}
public Float combine(Float s1, Float s2) {
return s1+s2;
}
});
}
In the parallel version, we implement the interface ReduceInt
with two methods, one to compute
sums of array elements and
another to combine
two such sums together. The same pattern
works for other reduce operations. For example, with similar functions
we could compute minimum and maximum values (in a single reduce) for
any indexed sequence of values.
More general loops are supported, and are equivalent to the following serial code:
for (int i=begin; i<end; i+=step)
// some computation that depends on i
The methods loop and reduce require that begin is less than end and
that step is positive. The requirement that begin is less than end
ensures that reduce is always well-defined. The requirement that step
is positive ensures that the loop terminates.
Static methods loop and reduce submit tasks to a fork-join framework that maintains a pool of threads shared by all users of these methods. These methods recursively split tasks so that disjoint sets of indices are processed in parallel by different threads.
In addition to the three loop parameters begin, end, and step, a fourth parameter chunk may be specified. This chunk parameter is a threshold for splitting tasks so that they can be performed in parallel. If a range of indices to be processed is smaller than the chunk size, or if too many tasks have already been queued for processing, then the indices are processed serially. Otherwise, the range is split into two parts for processing by new tasks. If specified, the chunk size is a lower bound; the number of indices processed serially will never be lower, but may be higher, than a specified chunk size. The default chunk size is one.
The default chunk size is often sufficient, because the test for an excess number of queued tasks prevents tasks from being split needlessly. This test is especially useful when parallel loops are nested, as when looping over elements of multi-dimensional arrays.
For example, an implementation of the method sqrParallel
for
3D arrays could simply call the 2D version listed above. Tasks will
naturally tend to be split for outer loops, but not inner loops,
thereby reducing overhead, time spent splitting and queueing tasks.
Reference: A Java Fork/Join Framework, by Doug Lea, describes the framework used to implement this class. This framework will be part of JDK 7.
Modifier and Type | Class and Description |
---|---|
static interface |
Parallel.LoopInt
A loop body that computes something for an int index.
|
static interface |
Parallel.ReduceInt<V>
A loop body that computes and returns a value for an int index.
|
static class |
Parallel.Unsafe<T>
A wrapper for objects that are not thread-safe.
|
Constructor and Description |
---|
Parallel() |
Modifier and Type | Method and Description |
---|---|
static void |
loop(int begin,
int end,
int step,
int chunk,
Parallel.LoopInt body)
Performs a loop
for (int i=begin; i<end; i+=step) . |
static void |
loop(int begin,
int end,
int step,
Parallel.LoopInt body)
Performs a loop
for (int i=begin; i<end; i+=step) . |
static void |
loop(int begin,
int end,
Parallel.LoopInt body)
Performs a loop
for (int i=begin; i<end; ++i) . |
static void |
loop(int end,
Parallel.LoopInt body)
Performs a loop
for (int i=0; i<end; ++i) . |
static <V> V |
reduce(int begin,
int end,
int step,
int chunk,
Parallel.ReduceInt<V> body)
Performs a reduce
for (int i=begin; i<end; i+=step) . |
static <V> V |
reduce(int begin,
int end,
int step,
Parallel.ReduceInt<V> body)
Performs a reduce
for (int i=begin; i<end; i+=step) . |
static <V> V |
reduce(int begin,
int end,
Parallel.ReduceInt<V> body)
Performs a reduce
for (int i=begin; i<end; ++i) . |
static <V> V |
reduce(int end,
Parallel.ReduceInt<V> body)
Performs a reduce
for (int i=0; i<end; ++i) . |
static void |
setParallel(boolean parallel)
Enables or disables parallel processing by all methods of this class.
|
public static void loop(int end, Parallel.LoopInt body)
for (int i=0; i<end; ++i)
.end
- the end index (not included) for the loop.body
- the loop body.public static void loop(int begin, int end, Parallel.LoopInt body)
for (int i=begin; i<end; ++i)
.begin
- the begin index for the loop; must be less than end.end
- the end index (not included) for the loop.body
- the loop body.public static void loop(int begin, int end, int step, Parallel.LoopInt body)
for (int i=begin; i<end; i+=step)
.begin
- the begin index for the loop; must be less than end.end
- the end index (not included) for the loop.step
- the index increment; must be positive.body
- the loop body.public static void loop(int begin, int end, int step, int chunk, Parallel.LoopInt body)
for (int i=begin; i<end; i+=step)
.begin
- the begin index for the loop; must be less than end.end
- the end index (not included) for the loop.step
- the index increment; must be positive.chunk
- the chunk size; must be positive.body
- the loop body.public static <V> V reduce(int end, Parallel.ReduceInt<V> body)
for (int i=0; i<end; ++i)
.end
- the end index (not included) for the loop.body
- the loop body.public static <V> V reduce(int begin, int end, Parallel.ReduceInt<V> body)
for (int i=begin; i<end; ++i)
.begin
- the begin index for the loop; must be less than end.end
- the end index (not included) for the loop.body
- the loop body.public static <V> V reduce(int begin, int end, int step, Parallel.ReduceInt<V> body)
for (int i=begin; i<end; i+=step)
.begin
- the begin index for the loop; must be less than end.end
- the end index (not included) for the loop.step
- the index increment; must be positive.body
- the loop body.public static <V> V reduce(int begin, int end, int step, int chunk, Parallel.ReduceInt<V> body)
for (int i=begin; i<end; i+=step)
.begin
- the begin index for the loop; must be less than end.end
- the end index (not included) for the loop.step
- the index increment; must be positive.chunk
- the chunk size; must be positive.body
- the loop body.public static void setParallel(boolean parallel)
Setting this flag to false disables parallel processing for all users of this class. This method should therefore be used for testing and benchmarking only.
parallel
- true, for parallel processing; false, otherwise.