Parallel Programming with Microsoft .Net : Parallel Aggregation - Variations

2/17/2011 9:24:19 AM
This section contains some common variations of the parallel aggregation pattern.

1. Using Parallel Loops for Aggregation

The Parallel.ForEach and Parallel.For methods have overloaded versions that can implement the parallel aggregation pattern. Here’s an example.

double[] sequence = ...
object lockObject = new object();
double sum = 0.0d;
// The values to be aggregated
// The local initial partial result
() => 0.0d,
// The loop body
(x, loopState, partialResult) =>
return Normalize(x) + partialResult;
// The final step of each local context
(localPartialSum) =>
// Enforce serial access to single, shared result
lock (lockObject)
sum += localPartialSum;
return sum;

Parallel.ForEach partitions the input based on the desired degree of parallelism and creates parallel tasks to perform the work. Each parallel task has local state that isn’t shared with other tasks. The loop body updates only the task-local state. In other words, the loop body accumulates its value first into a subtotal and not directly into the final total. When each task finishes, it adds its subtotal into the final sum.

Here is the signature of the overloaded version of Parallel. ForEach that’s used in this example.

Parallel.ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally);

There are four arguments. The first argument is the data values that will be aggregated. The second argument is a delegate that returns the initial value of the aggregation. The third argument is a delegate that combines one of the input data values with a partial sum from a previous iteration.

The Task Parallel Library (TPL) implementation of Parallel.ForEach creates tasks in order to execute the parallel loop. When the loop creates multiple tasks, there will be one partial result value for each task. The number of worker tasks is determined by the parallel loop implementation, based on heuristics. Generally, there will be at least one worker task per available core, but there could be more. As the loop progresses, TPL will sometimes retire tasks and create new ones.

When the parallel loop is ready to finish, it must merge the partial results from all of its worker tasks together to produce the final result. The fourth argument to Parallel.ForEach is a delegate that performs the merge step. The delegate is invoked once for each of the worker tasks. The argument to the delegate is the partial result that was calculated by the task. This delegate locks the shared result variable and combines the partial sum with the result.

This example uses the C# lock keyword to enforce interleaved, sequential access to variables that are shared by concurrent threads. There are other synchronization techniques that could be used in this situation, but they are outside the scope of this book. Be aware that locking is cooperative; that is, unless all threads that access the shared variable use locking consistently and correctly, serial access to the variable is not guaranteed.

The syntax for locking in C# is lock ( object ) { body }. The object uniquely identifies the lock. All cooperating threads must use the same synchronizing object, which must be a reference type such as Object and not a value type such as int or double. When you use lock with Parallel.For or Parallel.ForEach you should create a dummy object and set it as the value of a captured local variable dedicated to this purpose. (A captured variable is a local variable from the enclosing scope that is referenced in the body of a lambda expression.) The lock’s body is the region of code that will be protected by the lock. The body should take only a small amount of execution time. Which shared variables are protected by the lock object varies by application and is something that all programmers whose code accesses those variables must be careful not to contradict. In this example, the lock object ensures serial access to the sum variable.


You should document the variables that are protected by a lock with a comment in the code. It’s easy to make a mistake.

When a thread encounters a lock statement, it attempts to acquire the lock associated with the synchronizing object. Only one thread at a time can hold this lock. If another thread has already acquired the lock but has not yet released it, the current thread will block until the lock becomes available. When multiple threads compete for a lock, the order in which they acquire the lock is unpredictable; it’s not necessarily in FIFO order. After the thread successfully acquires the lock, it executes the statements inside of the body. When the thread exits the lock body (whether normally or by throwing an exception), it releases the lock.

The localFinally delegate (argument four) is executed once at the end of each task. Therefore, the number of locks taken will be equal to the number of tasks that are used to execute the parallel loop. You can’t predict how many tasks will be used. For example, you shouldn’t assume that the number of tasks will be less than or equal to the number that you provided with the parallel option MaxDegreeOfParallelism (or its default value). The reason for this is that the parallel loop sometimes shuts down tasks during the execution of the loop and continues with new tasks. This cooperative yielding of control can in some cases reduce system latency by allowing other tasks to run. It also helps prevent an unintended increase in the number of thread pool worker threads. You shouldn’t consider the state of the accumulator to be strictly “thread local.” It’s actually “task local.” (The distinction between thread local and task local won’t normally affect a program’s logic because a task is guaranteed to execute from start to finish on only one thread.)

For comparison, here’s the PLINQ version of the example used in this variation.

double[] sequence = ...
return sequence.AsParallel().Select(Normalize).Sum();

2. Using a Range Partitioner for Aggregation

When you have a loop body with a very small amount of work to do, and there are many iterations to perform, it’s possible that the overhead of Parallel.ForEach is large compared to the cost of executing the loop body.

In this case, it’s sometimes more efficient to use a Partitioner object for the loop. A Partitioner object allows you to embed a sequential for loop inside of your Parallel.ForEach loop and reduce the number of iterations of the Parallel.ForEach loop. Generally, you should profile the application in order to decide whether to use a Partitioner object.

Here’s an example.

double[] sequence = ...
object lockObject = new object();
double sum = 0.0d;
var rangePartitioner = Partitioner.Create(0, sequence.Length);
// The input intervals
// The local initial partial result
() => 0.0,
// The loop body for each interval
(range, loopState, initialValue) =>
double partialSum = initialValue;
for (int i = range.Item1; i < range.Item2; i++)
partialSum += Normalize(sequence[i]);
return partialSum;
// The final step of each local context
(localPartialSum) =>
// Use lock to enforce serial access to shared result
lock (lockObject)
sum += localPartialSum;
return sum;

The main difference is that the Parallel.ForEach loop uses a sequence of index intervals, which are generated by the Partitioner object, as its input instead of individual values. This can avoid some of the overhead involved in invoking delegate methods. You’ll notice a benefit only when the amount of work in each step is small and there are many steps.

Here’s the signature of the overloaded version of the Parallel.ForEach method that was used.

Parallel.ForEach<TSource, TLocal>(
Partitioner<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally);

3. Using PLINQ Aggreation with Range Selection

The PLINQ Aggregate extension method includes an overloaded version that allows a very general application of the parallel aggregation pattern. Here’s an example from an application that does financial simulation. The method performs repeated simulation trials and aggregates results into a histogram. There are two dependencies that must be handled by this code. They are the accumulation of partial results into the result histogram and the use of instances of the Random class. (An instance of Random cannot be shared across multiple threads.)

int[] histogram = MakeEmptyHistogram();
return ParallelEnumerable.Range(0, count).Aggregate(
// 1- Create an empty local accumulator object
// that includes all task-local state.
() => new Tuple<int[], Random>(
new Random(SampleUtilities.MakeRandomSeed())),
// 2- Run the simulation, adding result to local accumulator.
(localAccumulator, i) =>
// With each iteration, get the next random value.
var sample = localAccumulator.Item2.NextDouble();
if (sample > 0.0 && sample < 1.0)
// Perform a simulation trial for the sample value.
var simulationResult =
DoSimulation(sample, mean, stdDev);
// Add result to the histogram of the local accumulator.
int histogramBucket =
(int)Math.Floor(simulationResult / BucketSize);
if (0 <= histogramBucket && histogramBucket < TableSize)
localAccumulator.Item1[histogramBucket] += 1;
return localAccumulator;
// 3- Combine local results pair-wise.
(localAccumulator1, localAccumulator2) =>
return new Tuple<int[], Random>(
// 4- Extract answer from final combination.
finalAccumulator => finalAccumulator.Item1
); // Aggregate

The data source in this example is a parallel query that’s created with the Range static method of the ParallelEnumerable class. Here’s the signature of the overloaded version of the Aggregate extension method that was used in this example.
Aggregate<TSource, TAccumulate, TResult>(
this ParallelQuery<TSource> source,
Func<TAccumulate> seedFactory,
Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
Func<TAccumulate, TAccumulate, TAccumulate>
Func<TAccumulate, TResult> resultSelector);

There are four arguments. Each of the arguments is a delegate method.

The first argument is a delegate that establishes the local state of each worker task that the query creates. This is called once, at the beginning of the task. In this example, the delegate returns a tuple (unnamed record) instance that contains two fields. The first field is an empty histogram. This will accumulate the local partial results of the simulation in this task. The second field is an instance of the Random class. It’s part of the local state to ensure that the simulation does not violate the requirements of the Random class by sharing instances across threads. Note that you can store virtually any kind of local state in the object you create.

The second argument is the loop body. This delegate is invoked once per data element in the partition. In this example, the loop body creates a new random sample and performs the simulation experiment. Then it classifies the result of the simulation into buckets used by the histogram and increments the appropriate bucket of the task’s local histogram.

The third argument is invoked for pair-wise combinations of local partial results. The delegate merges the input histograms (it adds their corresponding bucket values and returns a new histogram with the sum). It returns a new tuple. The null argument reflects the fact that the random number generator is no longer needed. The combination delegate is invoked as many times as necessary to consolidate all the local partial results.

The fourth argument selects the result from the final, merged local state object.

This variation can be adapted to many situations that use the Parallel Aggregation pattern. Note that this implementation doesn’t require locks.

  •  Leveraging and Optimizing Search in SharePoint 2010 : Uninstalling FAST Search Server 2010 for SharePoint
  •  Leveraging and Optimizing Search in SharePoint 2010 : Customizing the FAST Search User Interface
  •  Deploying the Client for Microsoft Exchange Server 2010 : Planning Considerations and Best Practices
  •  Deploying the Client for Microsoft Exchange Server 2010 : Understanding Deployment Options
  •  Deploying the Client for Microsoft Exchange Server 2010 : Outlook 2007 Auto Account Setup
  •  Leveraging and Optimizing Search in SharePoint 2010 : Deploying FAST Search Service Applications
  •  Leveraging and Optimizing Search in SharePoint 2010 : Customizing the Search User Interface
  •  Leveraging and Optimizing Search in SharePoint 2010 : Keywords and Best Bets
  •  Leveraging and Optimizing Search in SharePoint 2010 : Federating Search
  •  Leveraging and Optimizing Search in SharePoint 2010 : Search Scopes
  •  Active Directory Domain Services 2008 : View Cached Credentials on a Read-Only Domain Controller
  •  Active Directory Domain Services 2008 : Remove a User, Group, or Computer from the Password Replication Policy
  •  Active Directory Domain Services 2008 : Add a User, Group, or Computer to the Password Replication Policy
  •  Exchange Server 2010 : Backing Up Specific Windows Services
  •  Create Bookmark Create Note or Tag Backing Up Windows Server 2008 and Exchange Server 2010
  •  What to Back Up on Exchange Servers 2010
  •  Leveraging and Optimizing Search in SharePoint 2010 : Define Content Sources
  •  Deploying a Native SharePoint 2010 Search Service Application
  •  Backing Up the Exchange Server 2010 Environment : Roles and Responsibilities & Developing a Backup Strategy
  •  Backing Up the Exchange Server 2010 Environment : Supporting Backups with Documentation
    Top 10
    Building Out Of Browser Silverlight Applications - Controlling the Application Window
    Exchange Server 2010 : Maintaining Reliability and Availability - Recover Data
    jQuery 1.3 : Table Manipulation - Sorting and paging (part 1) : Server-side sorting & JavaScript sorting
    jQuery 1.3 : Developing plugins - Adding new global functions
    SQL Server 2008 : Programming Objects - Implementing Stored Procedures
    Windows Azure : Messaging with the queue - Decoupling your system with messaging
    Tools to Manage Access Control Lists
    Mobile Application Security : The Apple iPhone - Local Data Storage: Files, Permissions, and Encryption
    Design and Deploy High Availability for Exchange 2007 : Create Bookmark Create Note or Tag Implement Standby Continuous Replication (SCR)
    Deploying the Client for Microsoft Exchange Server 2010 : Pushing Outlook Client Software with Group Policies
    Most View
    ASP.NET 4 in VB 2010 : The Security Controls
    iPhone 3D Programming : Adding Textures to ModelViewer (part 4) - Enabling Textures with ES2::RenderingEngine
    Outlining AD DS Changes in Windows Server 2008 R2 (part 3) - Auditing Changes Made to AD Objects
    SharePoint 2010 : SQL Server Database Mirroring for SharePoint Farms
    Building LOB Applications : Data Validation through Data Annotation
    Navigating Architecture Changes in Windows Vista
    iPhone 3D Programming : Adding Depth and Realism - Shaders Demystified
    jQuery 1.3 : Modifying table appearance (part 2) - Tooltips
    Design and Deploy High Availability for Exchange 2007 : Design Edge Transport and Unified Messaging High Availability
    Personalizing Windows 7 (part 3) - Choosing and Configuring Your Screensaver
    SQL Server 2008 : Service Broker - Message Types
    Designing and Implementing Mobility in Exchange Server 2010 : Securing Access to ActiveSync Using Internet Security and Acceleration (ISA) Server 2006
    SQL Server 2008 : What Is the Base Class Library?
    Creating and Managing Views in SQL Server 2008 : Creating Views
    Examining Integration Points Between SharePoint and Public Key Infrastructure
    jQuery 1.3 : Modifying table appearance (part 3) - Collapsing and expanding sections
    Advanced ASP.NET : Creating a Component
    Installing SharePoint 2010 Using PowerShell
    iPhone 3D Programming : Holodeck Sample (part 4) - Replacing Buttons with Orientation Sensors
    iPhone 3D Programming : Reflections with Cube Maps