programming4us
programming4us
ENTERPRISE

Parallel Programming with Microsoft .Net : Pipelines - Variations

10/12/2010 6:00:21 PM
There are several variations to the pipeline pattern.

1. Canceling a Pipeline

Pipeline tasks work together to perform their work; they must also work together when they respond to a cancellation.

In the standard cancellation scenario, your task is passed a CancellationToken value from a higher layer of the application, such as the UI. In the case of a pipeline stage, you need to observe this cancellation token in two places. This is shown in the following code.

void DoStage(BlockingCollection<T> input,
BlockingCollection<T> output,
CancellationToken token)
try
{
foreach (var item in input.GetConsumingEnumerable())
{
if (token.IsCancellationRequested) break;
var result = ...
output.Add(result, token);
}
}
catch (OperationCanceledException) { }
finally
{
output.CompleteAdding();
}
}

A natural place to check for cancellation is at the beginning of the loop that processes items from the blocking collection. At this point, you only need to break from the loop, as shown in the following code.

if (token.IsCancellationRequested) break;

The second place is less obvious. You use an overloaded version of the BlockingCollection<T> class’s Add method that accepts a cancellation token as an argument. The following code shows how to do this.

output.Add(result, token);

If you give the Add method a cancellation token, the blocking collection will be able to detect the cancellation request while it’s waiting to add a new value.

To understand why it’s necessary to add such detailed cancellation logic, recall that it’s possible that a pipeline stage that’s creating values can be blocked when it calls the Add method of its output queue. (If the output queue is full, the call to Add waits until space is available before returning.) If a cancellation request happened in this situation and you didn’t check for it, it would be possible for your program to experience deadlock. Deadlock could happen because stages of the pipeline that consume values produced by this stage can also be canceled; therefore, it’s very possible that later stages in the pipeline would terminate without draining the queue. The producer, which is blocked while waiting for space to become available, would have no way to proceed.


Note: Improperly canceling a pipeline can cause your program to experience deadlock. Follow the guidelines in this section carefully.

The solution, as shown in the example, is to use the overloaded version of the blocking collection’s Add method that accepts a cancellation token as an argument. If a cancellation is requested while the producer waits for space to become available, the blocking collection creates and throws an OperationCanceledException instance.


Note: Check for cancellation at the beginning of a stage’s main loop and also when adding values to a blocking collection.

If the loop exits normally, from the break keyword, or from a cancellation exception, the finally clause guarantees that the output buffer will be marked as completed. This will unblock any consumers that might be waiting for input, and they can then process the cancellation request.

Although it’s also possible to check for a cancellation token while waiting for input from a blocking collection (there’s an overloaded version of the GetConsumingEnumerable method that accepts a cancellation token), you don’t need to do this if you use the techniques described in this section.

Be aware that if the type T implements the IDispose interface, under .NET coding conventions, you also must call the Dispose method on cancellation. You need to dispose of the current iteration’s object as well as instances of T stored in the blocking queues. The online source code of the ImagePipeline example shows how to do this.

2. Handling Pipeline Exceptions

Exceptions are similar to cancellations. The difference between the two is that when an unhandled exception occurs within one of the pipeline stages, the tasks that execute the other stages don’t by default receive notification that an exception has occurred elsewhere. Without such notification, there are several ways for the application to deadlock.


Note: When there is an unhandled exception in one pipeline stage, you should cancel the other stages. If you don’t do this, deadlock can occur. Follow the guidelines in this section carefully.

Use a special instantiation of the CancellationTokenSource class to allow your application to coordinate the shutdown of all the pipeline stages when an exception occurs in one of them. Here’s an example.

static void DoPipeline(CancellationToken token)
{
using (CancellationTokenSource cts =
CancellationTokenSource.CreateLinkedTokenSource(token))
{
var f = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);

var stage1 = f.StartNew(() => DoStage1(..., cts));
var stage2 = f.StartNew(() => DoStage2(..., cts));
var stage3 = f.StartNew(() => DoStage3(..., cts));
var stage4 = f.StartNew(() => DoStage4(..., cts));
}
Task.WaitAll(stage1, stage2, stage3, stage4);
}

The CreateLinkedTokenSource method of the CancellationToken-Source class creates a handle that allows you to respond to an external cancellation request and also to initiate (and recognize) an internal cancellation request of your own. You pass the linked cancellation token source as an argument to the methods that execute your pipeline stages, so each stage can use the token source to initiate cancellation.

Here’s an example.

void DoStage(BlockingCollection<T> input,
BlockingCollection<T> output,
CancellationTokenSource cts)
try
{
var token = cts.Token;
foreach (var item in input.GetConsumingEnumerable())
{
if (token.IsCancellationRequested) break;
var result = ...
output.Add(result, token);
}
}
catch (Exception e)
{
// If an exception occurs, notify all other pipeline stages.
cts.Cancel();
if (!(e is OperationCanceledException))
throw;
}
finally
{
output.CompleteAdding();
}
}


This code is similar to the cancellation variation described earlier, except that when an unhandled exception occurs, the exception is intercepted by the catch block, which also signals cancellation for all of the other pipeline stages. Consequently, each pipeline stage will begin an orderly shutdown.

After all pipeline tasks stop, the original exception, wrapped as an inner exception of an AggregateException instance, is thrown by the Task.WaitAll method. You should be sure to include a catch or finally block to do any cleanup, such as releasing handles to unmanaged resources.

You might ask why you can’t just pass a CancellationToken-Source object to the DoPipeline method instead of a CancellationToken value. By convention, only the cancellation token is passed as an argument. Passing a CancellationToken value allows lower-level library code to respond to an external cancellation but prevents lower-level libraries from initiating the cancellation of higher-level components. In other words, responding to a cancellation request requires less privilege than initiating such a request.

3. Load Balancing Using Multiple Producers

The BlockingCollection<T> class allows you to read values from more than one producer. This feature is provided by the TakeFromAny static method and its variants. You can use TakeFromAny to implement load balancing strategies for some pipeline scenarios (but not all). This variation is sometimes known as a nonlinear pipeline.

The image pipeline example described earlier in this chapter requires that the slideshow of thumbnail images be performed in the same order as the input files. This is a constraint that’s common to many pipeline scenarios, such as processing a series of video frames. However, in the case of the image pipeline example, the filter operations on successive images are independent of each other. In this case, you can insert an additional pipeline task. This is shown in Figure 1.


Note: It is sometimes possible to implement load balancing by increasing the number of tasks used for a particular pipeline stage.
Figure 1. Consuming values from multiple producers


Figure 7-6 shows what happens when you add an additional filter task. Both of the filter tasks take images produced by the previous stage of the pipeline. The order in which they consume these images is not fully determined, although from a filter’s local point of view, no input image ever arrives out of order.

Each of the filter stages has its own blocking collection for placing in queues the elements that it produces. The consumer of these queues is a component known as a multiplexer, which combines the inputs from all of the producers. The multiplexer allows its consumer, which in this case is the display stage of the pipeline, to receive the images in the correct sequential order. The images don’t need to be sorted or reordered. Instead, the fact that each producer queue is locally ordered allows the multiplexer to look for the next value in the sequence by simultaneously monitoring the heads of all of the producer queues. This is where the blocking collection’s TakeFromAny method comes into play. The method allows the multiplexer to block until any of the producer queues has a value to read.

Here’s an example to make this more concrete. Suppose that each image has a unique sequence number that’s available as a property. The image numbers start with 1 and increase sequentially. As Figure 1 shows, the first filter might process images that are numbered 1, 4, and 5, while the second filter processes images with sequence numbers 2, 3, 6, and 7. Each load-balanced filter stage collects its output images into two queues. The two output queues are correctly ordered (that is, no higher numbered image comes before a lower numbered image), but there are gaps in the sequence of numbers. For example, if you take values from the first filter’s output queue, you get image 1, followed by image 4, followed by image 5. Images 2 and 3 are missing because they’re found in the second filter’s output queue.

The gaps are a problem. The next stage of the pipeline, the Display Image stage, needs to show images in order and without gaps in the sequence. This is where the multiplexer comes in. Using the Take-FromAny method, the multiplexer waits for input from both of the filter stage producer queues. When an image arrives, the multiplexer looks to see if the image’s sequence number is the next in the expected sequence. If it is, the multiplexer passes it to the Display Image stage. If the image is not the next in the sequence, the multiplexer holds the value in an internal look-ahead buffer and repeats the take operation for the input queue that does not have a look-ahead value. This algorithm allows the multiplexer to put together the inputs from the incoming producer queues in a way that ensures sequential order without sorting the values.

Figure 2 shows the performance benefit of doubling the number of filter stages when the filter operation is twice as expensive as the other pipeline stages.

Figure 2. Image pipeline with load balancing


If all pipeline stages, except the filter stage, take T units of time to process an image, and the filter stage takes 2 x T units of time, using two filter stages and two producer queues to load balance the pipeline results in an overall speed of approximately T units of time per image as the number of images grows. If you run the ImagePipeline sample and select the Load Balanced radio button, you’ll see this effect. The speed of the pipeline (after a suitable number of images are processed) will converge on the average time of the slowest single-instance stage or on one-half of the average filter time, whichever is greater.

The queue wait time of Queue 3, which is displayed on the Image-Pipeline sample’s UI, indicates the overhead that’s introduced by waiting on multiple producer queues. This is an example of how adding overhead to a parallel computation can actually increase the overall speed if the change also allows more efficient use of the available cores.

4. Pipelines and Streams

You may have noticed that blocking collections and streams have some similarities. It’s sometimes useful to treat a blocking collection as a stream, and vice versa. For example, you may want to use a Pipeline pattern with library methods that read and write to streams. Suppose that you want to compress a file and then encrypt it. Both compression and encryption are supported by the .NET Framework, but the methods expect streams, not blocking collections, as input.

It’s possible to create a stream whose underlying implementation relies on tasks and a blocking collection.

5. Asynchronous Pipelines

The pipelines that have been described so far are synchronous. Producers and consumers are long-running tasks that block on inputs and outputs. You could also have an asynchronous pipeline, where tasks aren’t created until data becomes available. For that, you could use the AsyncCall class that’s found in the ParallelExtensionsExtras sample project. The AsyncCall class is a queue that a producer puts data into; if there’s currently no task processing the queue when data arrives, a new task is spun up to process the queue, and it is active as long as there’s incoming data to process. If it ever finds that there is no more data, the task goes away. If more data arrives, a new task starts.

Other  
 
Video
PS4 game trailer XBox One game trailer
WiiU game trailer 3ds game trailer
Top 10 Video Game
-   Minecraft Mods - MAD PACK #10 'NETHER DOOM!' with Vikkstar & Pete (Minecraft Mod - Mad Pack 2)
-   Minecraft Mods - MAD PACK #9 'KING SLIME!' with Vikkstar & Pete (Minecraft Mod - Mad Pack 2)
-   Minecraft Mods - MAD PACK #2 'LAVA LOBBERS!' with Vikkstar & Pete (Minecraft Mod - Mad Pack 2)
-   Minecraft Mods - MAD PACK #3 'OBSIDIAN LONGSWORD!' with Vikkstar & Pete (Minecraft Mod - Mad Pack 2)
-   Total War: Warhammer [PC] Demigryph Trailer
-   Minecraft | MINIONS MOVIE MOD! (Despicable Me, Minions Movie)
-   Minecraft | Crazy Craft 3.0 - Ep 3! "TITANS ATTACK"
-   Minecraft | Crazy Craft 3.0 - Ep 2! "THIEVING FROM THE CRAZIES"
-   Minecraft | MORPH HIDE AND SEEK - Minions Despicable Me Mod
-   Minecraft | Dream Craft - Star Wars Modded Survival Ep 92 "IS JOE DEAD?!"
-   Minecraft | Dream Craft - Star Wars Modded Survival Ep 93 "JEDI STRIKE BACK"
-   Minecraft | Dream Craft - Star Wars Modded Survival Ep 94 "TATOOINE PLANET DESTRUCTION"
-   Minecraft | Dream Craft - Star Wars Modded Survival Ep 95 "TATOOINE CAPTIVES"
-   Hitman [PS4/XOne/PC] Alpha Gameplay Trailer
-   Satellite Reign [PC] Release Date Trailer
programming4us
 
 
programming4us