Here’s an example of how the Futures
pattern can be used in an application. The example shows how you can
run computationally intensive operations in parallel in an application
that uses a graphical user interface (GUI).
Adatum is a
financial services firm that provides a financial dashboard application
to its employees. The application, known as the Adatum Dashboard, allows
employees to perform analyses of financial markets. The dashboard
application runs on an employee’s desktop workstation. The Adatum
Dashboard analyzes historical data instead of a stream of real-time
price data. The analysis it performs is computationally intensive, but
there is also some I/O latency because the Adatum Dashboard application collects input data from several sources over the network.
After the application has
the market data, it merges the datasets together. The application
normalizes the merged market data and then performs an analysis step.
After the analysis, it creates a market model. It also performs these
same steps for historical market data from the Federal Reserve System.
After the current and historical models are ready, the application
compares the two models and makes a market recommendation of “buy,”
“sell,” or “hold.” You can visualize these steps as a graph. Figure 1 illustrates this.
The tasks in this diagram
communicate by specific types of business objects. These are implemented
as .NET classes in the Adatum Dashboard application.
You can download the source code for the Adatum Dashboard application from the CodePlex site at http://parallelpatterns.codeplex.com in
the Chapter5\A-Dash project. The application consists of four parts:
the business object definitions, an analysis engine, a view model, and
the user interface, or the view. Figure 2 illustrates this.
1. The Business Objects
The Adatum Dashboard uses
immutable data types. Objects of these types cannot be modified after
they are created, which makes them well suited to parallel applications.
The StockDataCollection
class represents a time series of closing prices for a group of
securities. You can think of this as a dictionary indexed by a stock
symbol. Conceptually, the values are arrays of prices for each security.
You can merge StockDataCollection values as long as the stock symbols don’t overlap. The result of the merge operation is a new StockDataCollection value that contains the time series of the inputs.
The StockAnalysisCollection class is the result of the analysis step. Similarly, the MarketModel and MarketRecommendation classes are the outputs of the modeling and the comparison phases of the application. The MarketRecommendation class has a property that contains a “buy, hold, or sell” decision.
2. The Analysis Engine
The Adatum Dashboard’s AnalysisEngine class produces a market recommendation from the market data it receives.
The sequential process is shown in the following code.
public MarketRecommendation DoAnalysisSequential()
{
StockDataCollection nyseData =
LoadNyseData();
StockDataCollection nasdaqData =
LoadNasdaqData();
StockDataCollection mergedMarketData =
MergeMarketData(new[]{nyseData, nasdaqData});
StockDataCollection normalizedMarketData =
NormalizeData(mergedMarketData);
StockDataCollection fedHistoricalData =
LoadFedHistoricalData();
StockDataCollection normalizedHistoricalData =
NormalizeData(fedHistoricalData);
StockAnalysisCollection analyzedStockData =
AnalyzeData(normalizedMarketData);
MarketModel modeledMarketData =
RunModel(analyzedStockData);
StockAnalysisCollection analyzedHistoricalData =
AnalyzeData(normalizedHistoricalData);
MarketModel modeledHistoricalData =
RunModel(analyzedHistoricalData);
MarketRecommendation recommendation =
CompareModels(new[] { modeledMarketData,
modeledHistoricalData });
return recommendation;
}
The final result of the computation is a MarketRecommendation
object. Each of the method calls returns data that becomes the input to
the operation that invokes it. When you use method invocations in this
way, you are limited to sequential execution. The DoAnalysis Sequential method returns only after all the dependent operations complete.
The parallel version uses futures and continuation tasks for each of the operational steps. Here’s the code.
public AnalysisTasks DoAnalysisParallel()
{
TaskFactory factory = Task.Factory;
// ...
Task<StockDataCollection> loadNyseData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNyseData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> loadNasdaqData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNasdaqData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> mergeMarketData =
factory.ContinueWhenAll<StockDataCollection,
StockDataCollection>(
new[] { loadNyseData, loadNasdaqData },
(tasks) => MergeMarketData(
from t in tasks select t.Result));
Task<StockDataCollection> normalizeMarketData =
mergeMarketData.ContinueWith(
(t) => NormalizeData(t.Result));
Task<StockDataCollection> loadFedHistoricalData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadFedHistoricalData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> normalizeHistoricalData =
loadFedHistoricalData.ContinueWith(
(t) => NormalizeData(t.Result));
Task<StockAnalysisCollection> analyzeMarketData =
normalizeMarketData.ContinueWith(
(t) => AnalyzeData(t.Result));
Task<MarketModel> modelMarketData =
analyzeMarketData.ContinueWith(
(t) => RunModel(t.Result));
Task<StockAnalysisCollection> analyzeHistoricalData =
normalizeHistoricalData.ContinueWith(
(t) => AnalyzeData(t.Result));
Task<MarketModel> modelHistoricalData =
analyzeHistoricalData.ContinueWith(
(t) => RunModel(t.Result));
Task<MarketRecommendation> compareModels =
factory.ContinueWhenAll<MarketModel, MarketRecommendation>(
new[] { modelMarketData, modelHistoricalData },
(tasks) => CompareModels(from t in tasks select t.Result));
Task errorHandler = CreateErrorHandler(loadNyseData,
loadNasdaqData, loadFedHistoricalData,
mergeMarketData, normalizeHistoricalData,
normalizeMarketData, analyzeHistoricalData,
analyzeMarketData, modelHistoricalData,
modelMarketData, compareModels);
return new AnalysisTasks()
{
LoadNyseData = loadNyseData,
LoadNasdaqData = loadNasdaqData,
MergeMarketData = mergeMarketData,
NormalizeMarketData = normalizeMarketData,
LoadFedHistoricalData = loadFedHistoricalData,
NormalizeHistoricalData = normalizeHistoricalData,
AnalyzeMarketData = analyzeMarketData,
AnalyzeHistoricalData = analyzeHistoricalData,
ModelMarketData = modelMarketData,
ModelHistoricalData = modelHistoricalData,
CompareModels = compareModels,
ErrorHandler = errorHandler
};
}
The parallel version, provided by the DoAnalysisParallel method, is similar to the sequential version, except that the synchronous method calls have been replaced with futures and continuation tasks. The method returns an AnalysisTasks object that contains the tasks associated with each step of the calculation. The DoAnalysisParallel method returns immediately, leaving the tasks running. The next sections describe how each of the tasks is created.
2.1. Loading External Data
The methods that gather the
external data from the network are long-running, I/O intensive
operations. Unlike the other steps, they are not particularly processor
intensive, but they may take a relatively long time to complete. Most of
their time is spent waiting for I/O operations to finish. You create
these tasks with a factory object, and you use an argument to specify
that the tasks are of long duration. This temporarily increases the
degree of concurrency that is allowed by the system. The following code
shows how to load the external data.
Task<StockDataCollection> loadNyseData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNyseData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> loadNasdaqData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNasdaqData(),
TaskCreationOptions.LongRunning);
Note that the factory creates futures that return values of type Stock DataCollection. The TaskCreationOptions.LongRunning
enumerated value tells the task library that you want more concurrency.
To prevent underutilization of processor resources, the task library
may choose to run tasks like these in additional threads.
Note:
Use “long running” tasks in cases where you want additional concurrency, such as when there are long-running I/O operations.
2.2. Merging
The merge operation takes inputs from both the loadNyseData and the loadNasdaqData tasks. It’s a continuation task that depends on two antecedent tasks, as shown in the following code.
Task<StockDataCollection> mergeMarketData =
factory.ContinueWhenAll<StockDataCollection,
StockDataCollection>(
new[] { loadNyseData, loadNasdaqData },
(tasks) => MergeMarketData(
from t in tasks select t.Result));
After the loadNyseData and loadNasdaqData tasks complete, the MergeMarketData method given as an argument is invoked. At that point, the tasks parameter will be an array of antecedent tasks, which are the loadNyseData and loadNasdaqData tasks.
The MergeMarketData method takes an array of StockData Collection objects as its input. The LINQ expression from t in tasks select t.Result maps the input array of futures into a collection of StockDataCollection objects by getting the Result property of each future.
2.3. Normalizing
After the market data is merged, it undergoes a normalization step.
Task<StockDataCollection>> normalizeMarketData =
mergeMarketData.ContinueWith(
(t) => NormalizeData(t.Result));
The ContinueWith
method creates a continuation task with a single antecedent. The
continuation task gets the result value from the task referenced by the mergeMarketData variable and invokes the NormalizeData method.
2.4. Analysis and Model Creation
After the market data is normalized, the application performs an analysis step. This takes an object of type StockAnalysisCollection as input and returns an object of type MarketAnalysis, as shown in the following code.
Task<StockAnalysisCollection> analyzeMarketData =
normalizeMarketData.ContinueWith(
(t) => AnalyzeData(t.Result));
Task<MarketModel> modelMarketData =
analyzeMarketData.ContinueWith(
(t) => RunModel(t.Result));
2.5. Processing Historical Data
The application also creates a
model of historical data. The steps that create the tasks are similar
to those for the current market data. However, because these steps are
performed by tasks, they may be run in parallel if the hardware
resources allow it.
2.6. Comparing Models
Here is the code that compares the two models.
Task<MarketRecommendation> compareModels =
factory.ContinueWhenAll<MarketModel, MarketRecommendation>(
new[] { modelMarketData, modelHistoricalData },
(tasks) => CompareModels(
from t in tasks select t.Result));
The “compare models” step compares the current and historical market models and produces the fnal result.
.3. View and View Model
The Adatum
Dashboard is a GUI-based application that also uses the
Model-View-ViewModel (MVVM) pattern. It breaks the parallel computation
into subtasks whose status can be independently seen from the user
interface.