Parallel Programming with Microsoft .Net : Pipelines - The Basics

10/12/2010 9:25:29 AM
The Pipeline pattern uses parallel tasks and concurrent queues to process a sequence of input values. Each task implements a stage of the pipeline, and the queues act as buffers that allow the stages of the pipeline to execute concurrently, even though the values are processed in order. You can think of software pipelines as analogous to assembly lines in a factory, where each item in the assembly line is constructed in stages. The partially assembled item is passed from one assembly stage to another. The outputs of the assembly line occur in the same order as that of the inputs.
Note: A pipeline is a series of parallel tasks that are connected by buffers.

A pipeline is composed of a series of producer/consumer stages, each one depending on the output of its predecessor. Pipelines allow you to use parallelism in cases where there are too many dependencies to use a parallel loop.

There are many ways to use pipelines. Pipelines are often useful when the data elements are received from a real-time event stream, such as values on stock ticker tapes, user-generated mouse click events, and packets that arrive over the network. Pipelines are also used to process elements from a data stream, as is done with compression and encryption, or processing streams of video frames. In all of these cases, it’s important that the data elements are processed in sequential order.

Note: Don’t confuse pipelines and parallel loops. Pipelines are used in cases where a parallel loop can’t be used. With the Pipeline pattern, the data is processed in sequential order. The first input is transformed into the first output, the second input into the second output, and so on.

Pipelines are also useful for processor-intensive applications and for applications where the cost of I/O is significant.

The Basics

In the Microsoft® .NET Framework, the buffers that connect stages of a software pipeline are usually based on the BlockingCollection<T> class.

Figure 1 illustrates an example of a pipeline that has four stages. It reads words and sentence fragments from a data source, it corrects the punctuation and capitalization, it groups them into complete sentences, and it writes the sentences to a disk file.

Figure 1. Sample pipeline

Each stage of the pipeline reads from a dedicated input and writes to a particular output. For example, the “Read Strings” task reads from a source and writes to buffer 1. All the stages of the pipeline can execute at the same time because concurrent queues buffer any shared inputs and outputs. If there are four available cores, the stages can run in parallel. As long as there is room in its output buffer, a stage of the pipeline can add the value it produces to its output queue. If the output buffer is full, the producer of the new value waits until space becomes available. Stages can also wait (that is, block) on inputs. An input wait is familiar from other programming contexts—if an enumeration or a stream does not have a value, the consumer of that enumeration or stream waits until a value is available or an “end of file” condition occurs. Blocking a collection works the same way. Using buffers that hold more than one value at a time compensates for variability in the time it takes to process each value.

Note: If the queue is full, the producer blocks. If the queue is empty, the consumer blocks.

The BlockingCollection<T> class lets you signal the “end of file” condition with the CompleteAdding method. This method tells the consumer that it can end its processing loop after all the data previously added to the collection is removed or processed.

The following code demonstrates how to implement a pipeline that uses the BlockingCollection class for the buffers and tasks for the stages of the pipeline.

int seed = ...
int BufferSize = ...
var buffer1 = new BlockingCollection<string>(BufferSize);
var buffer2 = new BlockingCollection<string>(BufferSize);
var buffer3 = new BlockingCollection<string>(BufferSize);
var f = new TaskFactory(TaskCreationOptions.LongRunning,
var stage1 = f.StartNew(() => ReadStrings(buffer1, ...));
var stage2 = f.StartNew(() => CorrectCase(buffer1, buffer2));
var stage3 = f.StartNew(() => CreateSentences(buffer2, buffer3));
var stage4 = f.StartNew(() => WriteSentences(buffer3));
Task.WaitAll(stage1, stage2, stage3, stage4);

The first stage produces the input strings and places them in the first buffer. The second stage transforms the strings. The third stage combines the strings and produces sentences. The final stage writes the corrected sentences to a file.

The buffers are instances of the BlockingCollection<string> class. The argument to the constructor specifies the maximum number of values that can be buffered at any one time. In this case, the value is 32 for each buffer.

Note: Don’t worry if you forget the LongRunning option. If you use the default task scheduler, the .NET thread pool will automatically add additional worker threads as needed at the rate of about 2 per second.

As this example shows, tasks in a pipeline are usually created with the LongRunning option. For more information, see the section, “Anti-Patterns,” later in this chapter.

The first stage of the pipeline includes a sequential loop that writes to an output buffer.

static void ReadStrings(BlockingCollection<string> output,
int seed)
foreach (var phrase in PhraseSource(seed))

The sequential loop populates the output buffer with values. The values come from an external data source that’s accessed by the PhraseSource method, which returns an ordinary single-threaded instance of IEnumerable<string>. The producer places a value in the blocking collection with the blocking collection’s Add method. This method can potentially block if the queue is full. This is a way to limit stages of the pipeline that are executing faster than other stages.

Note: Use a sequential loop to process steps in a pipeline stage. Call the CompleteAdding method once a pipeline stage will produce no more values.

The call to the CompleteAdding method is usually inside of a finally block so that it will execute even if an exception occurs.

Stages in the middle of the pipeline consume values from an input buffer and also produce values and place them into an output buffer. The following code shows how the stages are structured.

void DoStage(BlockingCollection<T> input,
BlockingCollection<T> output)
foreach (var item in input.GetConsumingEnumerable())
var result = ...

You can look at the online source code to see the implementations of the CorrectCase and CreateSentences methods that make up stages 2 and 3 of this pipeline. They are structured in a very similar way to this example. The important point of this code is that the input blocking collection’s GetConsumingEnumerable method returns an enumeration that a consumer can use to “take” the values. There may be many consumers of a single producer. When a consumer “takes” a value, no other consumer will see it.

Although this example doesn’t show it, a blocking collection’s GetConsumingEnumerable method can be called by more than one consumer. This allows values from the producer to be divided among multiple recipients. If a recipient gets a value from the blocking collection, no other consumer will also get that value.

Note: A blocking collection can have more than one consumer.

The last stage of the pipeline consumes values from a blocking collection but doesn’t produce values. Instead, it writes values to a stream. Here’s the code.

static void WriteSentences(BlockingCollection<string> input)
using (StreamWriter outfile =
new StreamWriter(PathForPipelineResults))
// ...
foreach (var sentence in input.GetConsumingEnumerable())
var printSentence = ...

One of the things that makes a pipeline easy to write in .NET is that you can rely on familiar sequential techniques such as iteration by using the IEnumerable<T> class. There is some synchronization, but it’s hidden inside the implementation of the BlockingCollection<T> class.

(Some details of error handling, cancellation and the collection of performance data have been omitted from this example for clarity. To see error handling and cancellation code, review the full ImagePipeline sample that’s mentioned later in this chapter.)

Top 10
Building Android Apps: Creating a Dynamic Manifest File
Designing and Configuring Unified Messaging in Exchange Server 2010 : Unified Messaging Architecture (part 1)
Windows Server 2008 : Configure NAP
SQL Server 2008 : Working with Multiple-Source Queries - Using Four-Part Database Names & The DTC Explained
Building Your First Windows Phone 7 Application (part 3) - Writing Your First Windows Phone Code
iPhone Programming : Creating a Table View
The AJAX Control Toolkit : Adding Safe Popup Capabilities to Web Pages
Customizing the Taskbar in Vista
Host-Based Security in Windows Vista
Developing Applications for the Cloud on the Microsoft Windows Azure Platform : DNS Names, Certificates, and SSL in the Surveys Application
Most View
Hacking :System Daemons
Programming WCF Services : Queued Services - Transactions
Password Cracking
SQL Server 2008 : Managing Query Performance - Forcing a Specific Execution Plan
iPhone Application Development : How Xcode and Interface Builder Implement MVC
SharePoint 2010 : SQL Backup Tools
Algorithms for Compiler Design: IMPLEMENTATION in Bottom-up Parsing
Programming with DirectX : View Transformations
The Membership Data Store
SharePoint 2010 : Outlining the Inherent Threat in SharePoint Web Traffic
Getting Familiar with AJAX
ASP.NET AJAX : Partial Refreshes (part 2) - Handling Errors
Programming with DirectX : Game Math - Bounding Geometry (part 1) - Bounding Boxes
Windows Phone 7 Development : Debugging Application Exceptions (part 2) - Debugging a Web Service Exception
Mobile Application Security : SMS Security - Overview of Short Message Service
SQL Server 2008 : Using the OUTPUT Clause with the MERGE Statement
Ten Keys to Successful Microsoft Business Intelligence
Exchange Server 2007 : Configure the Client Access Server - Manage OWA
Windows Phone 7 Development : Building a Trial Application (part 3) - Verifying Trial and Full Mode & Adding Finishing Touches
Microsoft XNA Game Studio 3.0 : Working with Colors