This section contains some common variations of the parallel aggregation pattern.
1. Using Parallel Loops for Aggregation
The Parallel.ForEach and Parallel.For methods have overloaded versions that can implement the parallel aggregation pattern. Here’s an example.
double[] sequence = ...
object lockObject = new object();
double sum = 0.0d;
Parallel.ForEach(
// The values to be aggregated
sequence,
// The local initial partial result
() => 0.0d,
// The loop body
(x, loopState, partialResult) =>
{
return Normalize(x) + partialResult;
},
// The final step of each local context
(localPartialSum) =>
{
// Enforce serial access to single, shared result
lock (lockObject)
{
sum += localPartialSum;
}
});
return sum;
Parallel.ForEach
partitions the input based on the desired degree of parallelism and
creates parallel tasks to perform the work. Each parallel task has local
state that isn’t shared with other tasks. The loop body updates only
the task-local state. In other words, the loop body accumulates its
value first into a subtotal and not directly into the final total. When
each task finishes, it adds its subtotal into the final sum.
Here is the signature of the overloaded version of Parallel. ForEach that’s used in this example.
Parallel.ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally);
There are four
arguments. The first argument is the data values that will be
aggregated. The second argument is a delegate that returns the initial
value of the aggregation. The third argument is a delegate that combines
one of the input data values with a partial sum from a previous
iteration.
The Task Parallel Library (TPL) implementation of Parallel.ForEach
creates tasks in order to execute the parallel loop. When the loop
creates multiple tasks, there will be one partial result value for each
task. The number of worker tasks is determined by the parallel loop
implementation, based on heuristics. Generally, there will be at least
one worker task per available core, but there could be more. As the loop
progresses, TPL will sometimes retire tasks and create new ones.
When the parallel loop is
ready to finish, it must merge the partial results from all of its
worker tasks together to produce the final result. The fourth argument
to Parallel.ForEach
is a delegate that performs the merge step. The delegate is invoked
once for each of the worker tasks. The argument to the delegate is the
partial result that was calculated by the task. This delegate locks the
shared result variable and combines the partial sum with the result.
This example uses the C# lock
keyword to enforce interleaved, sequential access to variables that are
shared by concurrent threads. There are other synchronization
techniques that could be used in this situation, but they are outside
the scope of this book. Be aware that locking
is cooperative; that is, unless all threads that access the shared
variable use locking consistently and correctly, serial access to the
variable is not guaranteed.
The syntax for locking in C# is lock ( object ) { body
}. The object uniquely identifies the lock. All cooperating threads
must use the same synchronizing object, which must be a reference type
such as Object and not a value type such as int or double. When you use lock with Parallel.For or Parallel.ForEach
you should create a dummy object and set it as the value of a captured
local variable dedicated to this purpose. (A captured variable is a
local variable from the enclosing scope that is referenced in the body
of a lambda expression.) The lock’s body is the region of code that will
be protected by the lock. The body should take only a small amount of
execution time. Which shared variables are protected by the lock object
varies by application and is something that all programmers whose code
accesses those variables must be careful not to contradict. In this
example, the lock object ensures serial access to the sum variable.
Note:
You should document the variables that are protected by a lock with a comment in the code. It’s easy to make a mistake.
When a thread encounters a
lock statement, it attempts to acquire the lock associated with the
synchronizing object. Only one thread at a time can hold this lock. If
another thread has already acquired the lock but has not yet released
it, the current thread will block until the lock becomes available. When
multiple threads compete for a lock, the order in which they acquire
the lock is unpredictable; it’s not necessarily in FIFO order. After the
thread successfully acquires the lock, it executes the statements
inside of the body. When the thread exits the lock body (whether
normally or by throwing an exception), it releases the lock.
The localFinally
delegate (argument four) is executed once at the end of each task.
Therefore, the number of locks taken will be equal to the number of
tasks that are used to execute the parallel loop. You can’t predict how
many tasks will be used. For example, you shouldn’t assume that the
number of tasks will be less than or equal to the number that you
provided with the parallel option MaxDegreeOfParallelism
(or its default value). The reason for this is that the parallel loop
sometimes shuts down tasks during the execution of the loop and
continues with new tasks. This cooperative yielding of control can in
some cases reduce system latency by allowing other tasks to run. It also
helps prevent an unintended increase in the number of thread pool
worker threads. You shouldn’t consider the state of the accumulator to
be strictly “thread local.” It’s actually “task local.” (The distinction
between thread local and task local won’t normally affect a program’s
logic because a task is guaranteed to execute from start to finish on
only one thread.)
For comparison, here’s the PLINQ version of the example used in this variation.
double[] sequence = ...
return sequence.AsParallel().Select(Normalize).Sum();
2. Using a Range Partitioner for Aggregation
When you have a loop body with a
very small amount of work to do, and there are many iterations to
perform, it’s possible that the overhead of Parallel.ForEach is large compared to the cost of executing the loop body.
In this case, it’s sometimes more efficient to use a Partitioner object for the loop. A Partitioner object allows you to embed a sequential for loop inside of your Parallel.ForEach loop and reduce the number of iterations of the Parallel.ForEach loop. Generally, you should profile the application in order to decide whether to use a Partitioner object.
Here’s an example.
double[] sequence = ...
object lockObject = new object();
double sum = 0.0d;
var rangePartitioner = Partitioner.Create(0, sequence.Length);
Parallel.ForEach(
// The input intervals
rangePartitioner,
// The local initial partial result
() => 0.0,
// The loop body for each interval
(range, loopState, initialValue) =>
{
double partialSum = initialValue;
for (int i = range.Item1; i < range.Item2; i++)
{
partialSum += Normalize(sequence[i]);
}
return partialSum;
},
// The final step of each local context
(localPartialSum) =>
{
// Use lock to enforce serial access to shared result
lock (lockObject)
{
sum += localPartialSum;
}
});
return sum;
The main difference is that the Parallel.ForEach loop uses a sequence of index intervals, which are generated by the Partitioner
object, as its input instead of individual values. This can avoid some
of the overhead involved in invoking delegate methods. You’ll notice a
benefit only when the amount of work in each step is small and there are
many steps.
Here’s the signature of the overloaded version of the Parallel.ForEach method that was used.
Parallel.ForEach<TSource, TLocal>(
Partitioner<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally);
3. Using PLINQ Aggreation with Range Selection
The PLINQ Aggregate extension method includes an overloaded version that allows a very general application of the parallel
aggregation pattern. Here’s an example from an application that does
financial simulation. The method performs repeated simulation trials and
aggregates results into a histogram. There are two dependencies that
must be handled by this code. They are the accumulation of partial
results into the result histogram and the use of instances of the Random class. (An instance of Random cannot be shared across multiple threads.)
int[] histogram = MakeEmptyHistogram();
return ParallelEnumerable.Range(0, count).Aggregate(
// 1- Create an empty local accumulator object
// that includes all task-local state.
() => new Tuple<int[], Random>(
MakeEmptyHistogram(),
new Random(SampleUtilities.MakeRandomSeed())),
// 2- Run the simulation, adding result to local accumulator.
(localAccumulator, i) =>
{
// With each iteration, get the next random value.
var sample = localAccumulator.Item2.NextDouble();
if (sample > 0.0 && sample < 1.0)
{
// Perform a simulation trial for the sample value.
var simulationResult =
DoSimulation(sample, mean, stdDev);
// Add result to the histogram of the local accumulator.
int histogramBucket =
(int)Math.Floor(simulationResult / BucketSize);
if (0 <= histogramBucket && histogramBucket < TableSize)
localAccumulator.Item1[histogramBucket] += 1;
}
return localAccumulator;
},
// 3- Combine local results pair-wise.
(localAccumulator1, localAccumulator2) =>
{
return new Tuple<int[], Random>(
CombineHistograms(localAccumulator1.Item1,
localAccumulator2.Item1),
null);
},
// 4- Extract answer from final combination.
finalAccumulator => finalAccumulator.Item1
); // Aggregate
The data source in this example is a parallel query that’s created with the Range static method of the ParallelEnumerable class. Here’s the signature of the overloaded version of the Aggregate extension method that was used in this example.
Aggregate<TSource, TAccumulate, TResult>(
this ParallelQuery<TSource> source,
Func<TAccumulate> seedFactory,
Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
Func<TAccumulate, TAccumulate, TAccumulate>
combineAccumulatorsFunc,
Func<TAccumulate, TResult> resultSelector);
There are four arguments. Each of the arguments is a delegate method.
The first argument is a
delegate that establishes the local state of each worker task that the
query creates. This is called once, at the beginning of the task. In
this example, the delegate returns a tuple (unnamed record) instance
that contains two fields. The first field is an empty histogram. This
will accumulate the local partial results of the simulation in this
task. The second field is an instance of the Random class. It’s part of the local state to ensure that the simulation does not violate the requirements of the Random class by sharing instances across threads. Note that you can store virtually any kind of local state in the object you create.
The second argument is the loop
body. This delegate is invoked once per data element in the partition.
In this example, the loop body creates a new random sample and performs
the simulation experiment. Then it classifies the result of the
simulation into buckets used by the histogram and increments the
appropriate bucket of the task’s local histogram.
The third argument is
invoked for pair-wise combinations of local partial results. The
delegate merges the input histograms (it adds their corresponding bucket
values and returns a new histogram with the sum). It returns a new
tuple. The null argument reflects the fact that the random number
generator is no longer needed. The combination delegate is invoked as
many times as necessary to consolidate all the local partial results.
The fourth argument selects the result from the final, merged local state object.
This variation can be adapted to many situations that use the Parallel Aggregation pattern. Note that this implementation doesn’t require locks.