The
Pipeline pattern uses parallel tasks and concurrent queues to process a
sequence of input values. Each task implements a stage of the pipeline,
and the queues act as buffers that allow the stages of the pipeline to
execute concurrently, even though the values are processed in order.
You can think of software pipelines
as analogous to assembly lines in a factory, where each item in the
assembly line is constructed in stages. The partially assembled item is
passed from one assembly stage to another. The outputs of the assembly
line occur in the same order as that of the inputs.
Note: A pipeline is a series of parallel tasks that are connected by buffers.
A pipeline is composed of
a series of producer/consumer stages, each one depending on the output
of its predecessor. Pipelines allow you to use parallelism in cases
where there are too many dependencies to use a parallel loop.
There are many ways to use
pipelines. Pipelines are often useful when the data elements are
received from a real-time event stream, such as values on stock ticker
tapes, user-generated mouse click events, and packets that arrive over
the network. Pipelines are also used to process elements from a data
stream, as is done with compression and encryption, or processing
streams of video frames. In all of these cases, it’s important that the
data elements are processed in sequential order.
Note:
Don’t confuse pipelines and parallel loops. Pipelines are used in cases
where a parallel loop can’t be used. With the Pipeline pattern, the
data is processed in sequential order. The first input is transformed
into the first output, the second input into the second output, and so
on.
Pipelines are also useful for processor-intensive applications and for applications where the cost of I/O is significant.
The Basics
In the Microsoft® .NET Framework, the buffers that connect stages of a software pipeline are usually based on the BlockingCollection<T> class.
Figure 1
illustrates an example of a pipeline that has four stages. It reads
words and sentence fragments from a data source, it corrects the
punctuation and capitalization, it groups them into complete sentences,
and it writes the sentences to a disk file.
Each stage of the
pipeline reads from a dedicated input and writes to a particular
output. For example, the “Read Strings” task reads from a source and
writes to buffer 1. All the stages of the pipeline can execute at the
same time because concurrent queues buffer any shared inputs and
outputs. If there are four available cores, the stages can run in parallel.
As long as there is room in its output buffer, a stage of the pipeline
can add the value it produces to its output queue. If the output buffer
is full, the producer of the new value waits until space becomes
available. Stages can also wait (that is, block) on inputs. An input
wait is familiar from other programming contexts—if an enumeration or a
stream does not have a value, the consumer of that enumeration or
stream waits until a value is available or an “end of file” condition
occurs. Blocking a collection works the same way. Using buffers that
hold more than one value at a time compensates for variability in the
time it takes to process each value.
Note: If the queue is full, the producer blocks. If the queue is empty, the consumer blocks.
The BlockingCollection<T> class lets you signal the “end of file” condition with the CompleteAdding
method. This method tells the consumer that it can end its processing
loop after all the data previously added to the collection is removed
or processed.
The following code demonstrates how to implement a pipeline that uses the BlockingCollection class for the buffers and tasks for the stages of the pipeline.
int seed = ...
int BufferSize = ...
var buffer1 = new BlockingCollection<string>(BufferSize);
var buffer2 = new BlockingCollection<string>(BufferSize);
var buffer3 = new BlockingCollection<string>(BufferSize);
var f = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
var stage1 = f.StartNew(() => ReadStrings(buffer1, ...));
var stage2 = f.StartNew(() => CorrectCase(buffer1, buffer2));
var stage3 = f.StartNew(() => CreateSentences(buffer2, buffer3));
var stage4 = f.StartNew(() => WriteSentences(buffer3));
Task.WaitAll(stage1, stage2, stage3, stage4);
The first stage produces the
input strings and places them in the first buffer. The second stage
transforms the strings. The third stage combines the strings and
produces sentences. The final stage writes the corrected sentences to a
file.
The buffers are instances of the BlockingCollection<string>
class. The argument to the constructor specifies the maximum number of
values that can be buffered at any one time. In this case, the value is
32 for each buffer.
Note: Don’t worry if you forget the LongRunning
option. If you use the default task scheduler, the .NET thread pool
will automatically add additional worker threads as needed at the rate
of about 2 per second.
As this example shows, tasks in a pipeline are usually created with the LongRunning option. For more information, see the section, “Anti-Patterns,” later in this chapter.
The first stage of the pipeline includes a sequential loop that writes to an output buffer.
static void ReadStrings(BlockingCollection<string> output,
int seed)
{
try
{
foreach (var phrase in PhraseSource(seed))
{
Stage1AdditionalWork();
output.Add(phrase);
}
}
finally
{
output.CompleteAdding();
}
}
The sequential loop
populates the output buffer with values. The values come from an
external data source that’s accessed by the PhraseSource method, which returns an ordinary single-threaded instance of IEnumerable<string>. The producer places a value in the blocking collection with the blocking collection’s Add
method. This method can potentially block if the queue is full. This is
a way to limit stages of the pipeline that are executing faster than
other stages.
Note: Use a sequential loop to process steps in a pipeline stage. Call the CompleteAdding method once a pipeline stage will produce no more values.
The call to the CompleteAdding method is usually inside of a finally block so that it will execute even if an exception occurs.
Stages in the middle of
the pipeline consume values from an input buffer and also produce
values and place them into an output buffer. The following code shows
how the stages are structured.
void DoStage(BlockingCollection<T> input,
BlockingCollection<T> output)
try
{
foreach (var item in input.GetConsumingEnumerable())
{
var result = ...
output.Add(result);
}
}
finally
{
output.CompleteAdding();
}
}
You can look at the online source code to see the implementations of the CorrectCase and CreateSentences
methods that make up stages 2 and 3 of this pipeline. They are
structured in a very similar way to this example. The important point
of this code is that the input blocking collection’s GetConsumingEnumerable
method returns an enumeration that a consumer can use to “take” the
values. There may be many consumers of a single producer. When a
consumer “takes” a value, no other consumer will see it.
Although this example doesn’t show it, a blocking collection’s GetConsumingEnumerable
method can be called by more than one consumer. This allows values from
the producer to be divided among multiple recipients. If a recipient
gets a value from the blocking collection, no other consumer will also
get that value.
Note: A blocking collection can have more than one consumer.
The last stage of the
pipeline consumes values from a blocking collection but doesn’t produce
values. Instead, it writes values to a stream. Here’s the code.
static void WriteSentences(BlockingCollection<string> input)
{
using (StreamWriter outfile =
new StreamWriter(PathForPipelineResults))
{
// ...
foreach (var sentence in input.GetConsumingEnumerable())
{
var printSentence = ...
outfle.WriteLine(printSentence);
}
}
}
One of the things that makes a pipeline easy to write in .NET is that you can rely on familiar sequential techniques such as iteration by using the IEnumerable<T> class. There is some synchronization, but it’s hidden inside the implementation of the BlockingCollection<T> class.
(Some details of error handling, cancellation and the collection of performance data have been omitted from this example
for clarity. To see error handling and cancellation code, review the
full ImagePipeline sample that’s mentioned later in this chapter.)