Reduction reduces a collection to a value. For example, you
could calculate the sum of a collection of values. Look at the
following loop. The dependency is the scalar variable, which is shared
between tasks. In particular, the scalar variable is shared across
threads, where each thread hosts one or more parallel tasks. The
problem is not necessarily between tasks but between threads.
int [] values=new int [] {1,2,3,4,5,6,7,8,9,10,
11,12,13,14,15,16,17,18,19,20};
int total = 0;
Parallel.ForEach(values, (item) =>
{
total += item;
});
The challenge is to make the preceding code thread-safe without sacrificing significant performance.
In the TPL, you perform reduction by using a private thread-local
variable that is shared by parallel tasks on the same thread. The
thread-local variable can be accessed serially by tasks running on the
same thread; therefore, it’s thread safe in that context. For this
reason, no overt synchronization is required within tasks on the same
thread. When the parallel loop completes, there are partial results on
each thread. A special function is then called to combine the partial
results into a final result. This is the only operation that would
access the global variable and require synchronization. For this to
work, the parallel operation must be both commutative and associative.
The following image shows how reduction is performed for a parallel
loop.
As mentioned, you should not parallelize operations that are neither commutative nor associative. The commutative
property means that the order of operations is not important: you might
remember the commutative property from Algebra 1, from the common
example a+b=b+a. The commutative property is most commonly associated with addition and multiplication—but not subtraction. The associative
property means that two or more operations will return the same
result—regardless of the sequence in which those operations are
performed. Basically, the textual order of operations is important but
not the sequencing. For example, (a+b)+c=a+(b+c).
Both Parallel.For and Parallel.ForEach have overloads with a couple of additional parameters for reduction: the localInit and localFinally parameters. You initialize the private thread-local variable by using the localInit function and perform the reduction in the localFinally function, which is an Action delegate. This localFinally function gets called after the parallel operation is complete. The parameters for localFinally
are the current array element, the loop state, and the thread-local
variable. In this function alone, you need to synchronize access to the
shared variable, which you can do by using a variety of techniques,
such as a Monitor class, an Interlocked class, or the lock statement.
Here’s the basic Parallel.For syntax for reduction.
public static ParallelLoopResult For<TLocal>(
int fromInclusive,
int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
This is the Parallel.ForEach syntax.
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
In this tutorial, you will count the number of values in an array
that are greater than 5. The reduction reduces the collection to a
single count.
Reduce a collection to a count
-
Create a new C# console project in Visual Studio 2010. Add a using statement for the System.Threading.Tasks
namespace to the source code. At class scope, define a static integer
array containing these values: 1, 10, 4, 3, 10, 20, 30, 5, and an
integer count variable.
static int[] intArray = new int [] { 1, 10, 4, 3, 10, 20, 30, 5 };
static int count=0;
-
In the Main method, iterate over the integer array by using a Parallel.For method. In the localInit method, initialize the thread local variable to 0. You also need to define the parameters for the localFinally delegate, the last parameter of the Parallel.For method.
Parallel.For(0, intArray.Length, ()=>0, (value, loopState, subtotal)
-
In the lambda expression for the loop operation, check the current
integer value. If it’s greater than 5, increment the counter. At the
end, display the thread identifier, index, current value, and partial
result. Return this partial result to be used by the next iteration on
the same thread.
if (intArray[value] > 5)
{
++subtotal;
Console.WriteLine("Thread {0}: Task {1}: Value {2}, Partial {3}",
Thread.CurrentThread.ManagedThreadId, index,
intArray[index], subtotal);
}
return subtotal;
-
After the parallel loop completes, the localFinally method is called. You can use the Interlocked.Add method to combine the partial results and calculate the total in a thread-safe manner.
Interlocked.Add(ref count, subtotal);
-
After the Parallel.For method, display the results.
Console.WriteLine("Count is {0}", count);
Your completed code should look similar to the following.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace Count
{
class Program
{
static int[] intArray = new int [] { 1, 10, 4, 3, 10, 20, 30, 5 };
static int count=0;
static void Main(string[] args)
{
Parallel.For(0, intArray.Length, ()=>0, (index, loopState, subtotal) =>
{
if (intArray[index] > 5)
{
++subtotal;
Console.WriteLine("Thread {0}: Task {1}: Value {2}, Partial {3}",
Thread.CurrentThread.ManagedThreadId, index,
intArray[index], subtotal); }
return subtotal;
},
(subtotal)=>
{
Interlocked.Add(ref count, subtotal);
});
Console.WriteLine("Count is {0}\n", count);
Console.WriteLine("Press Enter to Continue");
Console.ReadLine();
}
}
}
The following figure shows my results in the console window.
The preceding example calculated each partial result on a different
thread. Therefore, you have four partial results reduced to a final
total.
In the next tutorial, you will use the Parallel.ForEach method to calculate factorials. A factorial is the summation of contiguous values. For example, 5 factorial (5!) is 5×4×3×2×1, or 120. As in the previous example, the shared variable is the final result.
Reduce a collection of integers to a series of factorials
-
Create a console project in Visual Studio 2010 for C#. Add a using statement for the System.Threading.Tasks namespace to the source code. Prior to the Main method, declare a static integer named total, initialized to 1. Also define a constant called EXCLUSIVE
and assign the value 1. You use this variable to adjust the loop
boundary to include the maximum value. Finally, define a generic object
that you will use as a lock later in the program.
static int total=1;
const int EXCLUSIVE = 1;
static object mylock = new object();
-
You will calculate 5 factorial (5!). Define a Parallel.ForEach statement that starts at 1 and finishes at (5+EXCLUSIVE). This is the range of the factorial. Initialize the subtotal to 1.
Parallel.For(1, 5+EXCLUSIVE, () => 1, (value, loopState, accumulator) =>
-
In the parallel operation, multiply the accumulator (partial result) with the input value, and return the result.
accumulator*=value;
return accumulator;
-
Define a lambda expression for the lastFinally
delegate with the accumulator as the only parameter. In the lambda
expression, define a lock that protects access to the shared variable.
In the block, calculate the product of the partial result and the
current total.
lock (mylock)
{
total *= accumulator;
}
-
Display the results.
Console.WriteLine("The result is {0}", total);
Here is the entire program.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Factorial
{
class Program
{
static int total=1;
const int EXCLUSIVE = 1;
static object mylock = new object();
static void Main(string[] args)
{
Parallel.For(1, 5+EXCLUSIVE, () => 1, (value, loopState, accumulator) =>
{
accumulator*=value;
return accumulator;
},
(accumulator) =>
{
lock (mylock)
{
total *= accumulator;
}
});
Console.WriteLine("The result is {0}", total);
Console.WriteLine("Press enter to <end>");
Console.ReadLine();
}
}
}