There are a few things to watch out for when implementing a pipeline.
1. Thread Starvation
A pipeline requires all of its
tasks to be executing concurrently. If there are not enough threads to
run all pipeline tasks, the blocking collections can fill and block
indefinitely. To guarantee that a thread will be available to run each
pipeline task, you can use the default task scheduler’s LongRunning
task creation option. If you forget this option and are using the
default task scheduler, you’re still safe. The thread pool will
eventually notice that early stages of the pipeline are not making
progress and inject the additional threads that are needed for the
later pipeline stages.
2. Infinite Blocking Collection Waits
If a pipeline task throws an
exception, it will no longer take values from its input blocking
collection. If that blocking collection happens to be full, the task
that writes values to that collection will be blocked indefinitely. You
can avoid this situation by using the technique that was described in
the section, Section 1.
3. Forgetting Getconsumingenumerable()
Blocking collections implement IEnumerable<T>, so it’s easy to forget to call the GetConsumingEnumerable
method. If you make this mistake, the enumeration will be a snapshot of
the blocking collection’s state, and enumerating the results won’t
consume from the collection or modify it in any way, which means that
multiple consumers could get the same items.
Note:
It’s easy to forget to call the GetConsumingEnumerable because the Blocking Collection class implements IEnumerable<T> Enumerating over the blocking collection instance won’t consume values. Watch out for this!
4. Using Other Producer/Consumer Collections
The BlockingCollection<T>
class uses a concurrent queue as its default storage mechanism.
However, you can also specify your own storage mechanism. The only
requirement is that the underlying storage must implement the IProducerConsumerCollection
interface. In general, using the blocking collections provided by the
.NET Framework is easier and safer than writing your own implementation.
The .NET Framework provides several implementations of the IProducerConsumerCollection interface. These include the Con currentBag and the ConcurrentStack
classes. Therefore, in principle, you could use bag (unordered) or
stack (last-in, first-out [LIFO]) semantics for the buffers between
your pipeline’s stages.
Generally, only the default
first-in, first-out (FIFO) ordering is recommended. If you use a
concurrent bag, the outputs of your pipeline stages don’t depend on
order. In this case, a parallel loop could be used instead of a
pipeline. Parallel loops are faster and easier to code. The fact that
you are using unordered buffers is evidence that you’re using the wrong
pattern.