There are several variations to the pipeline pattern.
1. Canceling a Pipeline
Pipeline tasks work together to perform their work; they must also work together when they respond to a cancellation.
In the standard cancellation scenario, your task is passed a CancellationToken
value from a higher layer of the application, such as the UI. In the
case of a pipeline stage, you need to observe this cancellation token
in two places. This is shown in the following code.
void DoStage(BlockingCollection<T> input,
BlockingCollection<T> output,
CancellationToken token)
try
{
foreach (var item in input.GetConsumingEnumerable())
{
if (token.IsCancellationRequested) break;
var result = ...
output.Add(result, token);
}
}
catch (OperationCanceledException) { }
finally
{
output.CompleteAdding();
}
}
A natural place to check for
cancellation is at the beginning of the loop that processes items from
the blocking collection. At this point, you only need to break from the loop, as shown in the following code.
if (token.IsCancellationRequested) break;
The second place is less obvious. You use an overloaded version of the BlockingCollection<T> class’s Add method that accepts a cancellation token as an argument. The following code shows how to do this.
output.Add(result, token);
If you give the Add
method a cancellation token, the blocking collection will be able to
detect the cancellation request while it’s waiting to add a new value.
To understand why it’s
necessary to add such detailed cancellation logic, recall that it’s
possible that a pipeline stage that’s creating values can be blocked
when it calls the Add method of its output queue. (If the output queue is full, the call to Add
waits until space is available before returning.) If a cancellation
request happened in this situation and you didn’t check for it, it
would be possible for your program to experience deadlock. Deadlock
could happen because stages of the pipeline that consume values
produced by this stage can also be canceled; therefore, it’s very
possible that later stages in the pipeline would terminate without
draining the queue. The producer, which is blocked while waiting for
space to become available, would have no way to proceed.
Note: Improperly canceling a pipeline can cause your program to experience deadlock. Follow the guidelines in this section carefully.
The solution, as shown in the example, is to use the overloaded version of the blocking collection’s Add
method that accepts a cancellation token as an argument. If a
cancellation is requested while the producer waits for space to become
available, the blocking collection creates and throws an OperationCanceledException instance.
Note: Check for cancellation at the beginning of a stage’s main loop and also when adding values to a blocking collection.
If the loop exits normally, from the break keyword, or from a cancellation exception, the finally
clause guarantees that the output buffer will be marked as completed.
This will unblock any consumers that might be waiting for input, and
they can then process the cancellation request.
Although it’s also
possible to check for a cancellation token while waiting for input from
a blocking collection (there’s an overloaded version of the GetConsumingEnumerable method that accepts a cancellation token), you don’t need to do this if you use the techniques described in this section.
Be aware that if the type T implements the IDispose interface, under .NET coding conventions, you also must call the Dispose method on cancellation. You need to dispose of the current iteration’s object as well as instances of T stored in the blocking queues. The online source code of the ImagePipeline example shows how to do this.
2. Handling Pipeline Exceptions
Exceptions are
similar to cancellations. The difference between the two is that when
an unhandled exception occurs within one of the pipeline stages, the
tasks that execute the other stages don’t by default receive
notification that an exception has occurred elsewhere. Without such
notification, there are several ways for the application to deadlock.
Note:
When there is an unhandled exception in one pipeline stage, you should
cancel the other stages. If you don’t do this, deadlock can occur.
Follow the guidelines in this section carefully.
Use a special instantiation of the CancellationTokenSource
class to allow your application to coordinate the shutdown of all the
pipeline stages when an exception occurs in one of them. Here’s an
example.
static void DoPipeline(CancellationToken token)
{
using (CancellationTokenSource cts =
CancellationTokenSource.CreateLinkedTokenSource(token))
{
var f = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
var stage1 = f.StartNew(() => DoStage1(..., cts));
var stage2 = f.StartNew(() => DoStage2(..., cts));
var stage3 = f.StartNew(() => DoStage3(..., cts));
var stage4 = f.StartNew(() => DoStage4(..., cts));
}
Task.WaitAll(stage1, stage2, stage3, stage4);
}
The CreateLinkedTokenSource method of the CancellationToken-Source
class creates a handle that allows you to respond to an external
cancellation request and also to initiate (and recognize) an internal
cancellation request of your own. You pass the linked cancellation
token source as an argument to the methods that execute your pipeline
stages, so each stage can use the token source to initiate cancellation.
Here’s an example.
void DoStage(BlockingCollection<T> input,
BlockingCollection<T> output,
CancellationTokenSource cts)
try
{
var token = cts.Token;
foreach (var item in input.GetConsumingEnumerable())
{
if (token.IsCancellationRequested) break;
var result = ...
output.Add(result, token);
}
}
catch (Exception e)
{
// If an exception occurs, notify all other pipeline stages.
cts.Cancel();
if (!(e is OperationCanceledException))
throw;
}
finally
{
output.CompleteAdding();
}
}
This code is similar
to the cancellation variation described earlier, except that when an
unhandled exception occurs, the exception is intercepted by the catch
block, which also signals cancellation for all of the other pipeline
stages. Consequently, each pipeline stage will begin an orderly
shutdown.
After all pipeline tasks stop, the original exception, wrapped as an inner exception of an AggregateException instance, is thrown by the Task.WaitAll method. You should be sure to include a catch or finally block to do any cleanup, such as releasing handles to unmanaged resources.
You might ask why you can’t just pass a CancellationToken-Source object to the DoPipeline method instead of a CancellationToken value. By convention, only the cancellation token is passed as an argument. Passing a CancellationToken
value allows lower-level library code to respond to an external
cancellation but prevents lower-level libraries from initiating the
cancellation of higher-level components. In other words, responding to
a cancellation request requires less privilege than initiating such a
request.
3. Load Balancing Using Multiple Producers
The BlockingCollection<T> class allows you to read values from more than one producer. This feature is provided by the TakeFromAny static method and its variants. You can use TakeFromAny
to implement load balancing strategies for some pipeline scenarios (but
not all). This variation is sometimes known as a nonlinear pipeline.
The image pipeline example
described earlier in this chapter requires that the slideshow of
thumbnail images be performed in the same order as the input files.
This is a constraint that’s common to many pipeline scenarios, such as
processing a series of video frames. However, in the case of the image
pipeline example, the filter operations on successive images are
independent of each other. In this case, you can insert an additional
pipeline task. This is shown in Figure 1.
Note: It is sometimes possible to implement load balancing by increasing the number of tasks used for a particular pipeline stage.
Figure 7-6
shows what happens when you add an additional filter task. Both of the
filter tasks take images produced by the previous stage of the
pipeline. The order in which they consume these images is not fully
determined, although from a filter’s local point of view, no input
image ever arrives out of order.
Each of the filter stages
has its own blocking collection for placing in queues the elements that
it produces. The consumer of these queues is a component known as a
multiplexer, which combines the inputs from all of the producers. The
multiplexer allows its consumer, which in this case is the display
stage of the pipeline, to receive the images in the correct sequential
order. The images don’t need to be sorted or reordered. Instead, the
fact that each producer queue is locally ordered allows the multiplexer
to look for the next value in the sequence by simultaneously monitoring
the heads of all of the producer queues. This is where the blocking
collection’s TakeFromAny method comes into play. The method allows the multiplexer to block until any of the producer queues has a value to read.
Here’s an example to make
this more concrete. Suppose that each image has a unique sequence
number that’s available as a property. The image numbers start with 1
and increase sequentially. As Figure 1
shows, the first filter might process images that are numbered 1, 4,
and 5, while the second filter processes images with sequence numbers
2, 3, 6, and 7. Each load-balanced
filter stage collects its output images into two queues. The two output
queues are correctly ordered (that is, no higher numbered image comes
before a lower numbered image), but there are gaps in the sequence of
numbers. For example, if you take values from the first filter’s output
queue, you get image 1, followed by image 4, followed by image 5.
Images 2 and 3 are missing because they’re found in the second filter’s
output queue.
The gaps are a problem. The
next stage of the pipeline, the Display Image stage, needs to show
images in order and without gaps in the sequence. This is where the
multiplexer comes in. Using the Take-FromAny method,
the multiplexer waits for input from both of the filter stage producer
queues. When an image arrives, the multiplexer looks to see if the
image’s sequence number is the next in the expected sequence. If it is,
the multiplexer passes it to the Display Image stage. If the image is
not the next in the sequence, the multiplexer holds the value in an
internal look-ahead buffer and repeats the take operation for the input
queue that does not have a look-ahead value. This algorithm allows the
multiplexer to put together the inputs from the incoming producer
queues in a way that ensures sequential order without sorting the
values.
Figure 2
shows the performance benefit of doubling the number of filter stages
when the filter operation is twice as expensive as the other pipeline
stages.
If all pipeline stages,
except the filter stage, take T units of time to process an image, and
the filter stage takes 2 x T units of time, using two filter stages and
two producer queues to load balance the pipeline results in an overall
speed of approximately T units of time per image as the number of
images grows. If you run the ImagePipeline sample and select the Load Balanced
radio button, you’ll see this effect. The speed of the pipeline (after
a suitable number of images are processed) will converge on the average
time of the slowest single-instance stage or on one-half of the average
filter time, whichever is greater.
The queue wait time of Queue
3, which is displayed on the Image-Pipeline sample’s UI, indicates the
overhead that’s introduced by waiting on multiple producer queues. This
is an example of how adding overhead to a parallel computation can
actually increase the overall speed if the change also allows more
efficient use of the available cores.
4. Pipelines and Streams
You may have noticed that blocking collections and
streams have some similarities. It’s sometimes useful to treat a
blocking collection as a stream, and vice versa. For example, you may
want to use a Pipeline pattern with library methods
that read and write to streams. Suppose that you want to compress a
file and then encrypt it. Both compression and encryption are supported
by the .NET Framework, but the methods expect streams, not blocking
collections, as input.
It’s possible to create a
stream whose underlying implementation relies on tasks and a blocking
collection.
5. Asynchronous Pipelines
The pipelines that
have been described so far are synchronous. Producers and consumers are
long-running tasks that block on inputs and outputs. You could also
have an asynchronous pipeline, where tasks aren’t created until data
becomes available. For that, you could use the AsyncCall
class that’s found in the ParallelExtensionsExtras sample project. The AsyncCall
class is a queue that a producer puts data into; if there’s currently
no task processing the queue when data arrives, a new task is spun up
to process the queue, and it is active as long as there’s incoming data
to process. If it ever finds that there is no more data, the task goes
away. If more data arrives, a new task starts.