ENTERPRISE

Parallel Programming with Microsoft .Net : Futures - Example: The Adatum Financial Dashboard

2/19/2011 5:12:47 PM
Here’s an example of how the Futures pattern can be used in an application. The example shows how you can run computationally intensive operations in parallel in an application that uses a graphical user interface (GUI).

Adatum is a financial services firm that provides a financial dashboard application to its employees. The application, known as the Adatum Dashboard, allows employees to perform analyses of financial markets. The dashboard application runs on an employee’s desktop workstation. The Adatum Dashboard analyzes historical data instead of a stream of real-time price data. The analysis it performs is computationally intensive, but there is also some I/O latency because the Adatum Dashboard application collects input data from several sources over the network.

After the application has the market data, it merges the datasets together. The application normalizes the merged market data and then performs an analysis step. After the analysis, it creates a market model. It also performs these same steps for historical market data from the Federal Reserve System. After the current and historical models are ready, the application compares the two models and makes a market recommendation of “buy,” “sell,” or “hold.” You can visualize these steps as a graph. Figure 1 illustrates this.

Figure 1. Adatum Dashboard tasks


The tasks in this diagram communicate by specific types of business objects. These are implemented as .NET classes in the Adatum Dashboard application.

You can download the source code for the Adatum Dashboard application from the CodePlex site at http://parallelpatterns.codeplex.com in the Chapter5\A-Dash project. The application consists of four parts: the business object definitions, an analysis engine, a view model, and the user interface, or the view. Figure 2 illustrates this.

Figure 2. Adatum Dashboard application


1. The Business Objects

The Adatum Dashboard uses immutable data types. Objects of these types cannot be modified after they are created, which makes them well suited to parallel applications.

The StockDataCollection class represents a time series of closing prices for a group of securities. You can think of this as a dictionary indexed by a stock symbol. Conceptually, the values are arrays of prices for each security. You can merge StockDataCollection values as long as the stock symbols don’t overlap. The result of the merge operation is a new StockDataCollection value that contains the time series of the inputs.

The StockAnalysisCollection class is the result of the analysis step. Similarly, the MarketModel and MarketRecommendation classes are the outputs of the modeling and the comparison phases of the application. The MarketRecommendation class has a property that contains a “buy, hold, or sell” decision.

2. The Analysis Engine

The Adatum Dashboard’s AnalysisEngine class produces a market recommendation from the market data it receives.

The sequential process is shown in the following code.

public MarketRecommendation DoAnalysisSequential()
{
StockDataCollection nyseData =
LoadNyseData();
StockDataCollection nasdaqData =
LoadNasdaqData();
StockDataCollection mergedMarketData =
MergeMarketData(new[]{nyseData, nasdaqData});
StockDataCollection normalizedMarketData =
NormalizeData(mergedMarketData);
StockDataCollection fedHistoricalData =
LoadFedHistoricalData();
StockDataCollection normalizedHistoricalData =
NormalizeData(fedHistoricalData);
StockAnalysisCollection analyzedStockData =
AnalyzeData(normalizedMarketData);
MarketModel modeledMarketData =
RunModel(analyzedStockData);
StockAnalysisCollection analyzedHistoricalData =
AnalyzeData(normalizedHistoricalData);
MarketModel modeledHistoricalData =
RunModel(analyzedHistoricalData);
MarketRecommendation recommendation =
CompareModels(new[] { modeledMarketData,
modeledHistoricalData });
return recommendation;
}


The final result of the computation is a MarketRecommendation object. Each of the method calls returns data that becomes the input to the operation that invokes it. When you use method invocations in this way, you are limited to sequential execution. The DoAnalysis Sequential method returns only after all the dependent operations complete.

The parallel version uses futures and continuation tasks for each of the operational steps. Here’s the code.

public AnalysisTasks DoAnalysisParallel()
{
TaskFactory factory = Task.Factory;
// ...
Task<StockDataCollection> loadNyseData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNyseData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> loadNasdaqData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNasdaqData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> mergeMarketData =
factory.ContinueWhenAll<StockDataCollection,
StockDataCollection>(
new[] { loadNyseData, loadNasdaqData },
(tasks) => MergeMarketData(
from t in tasks select t.Result));
Task<StockDataCollection> normalizeMarketData =
mergeMarketData.ContinueWith(
(t) => NormalizeData(t.Result));
Task<StockDataCollection> loadFedHistoricalData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadFedHistoricalData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> normalizeHistoricalData =
loadFedHistoricalData.ContinueWith(
(t) => NormalizeData(t.Result));
Task<StockAnalysisCollection> analyzeMarketData =
normalizeMarketData.ContinueWith(
(t) => AnalyzeData(t.Result));
Task<MarketModel> modelMarketData =
analyzeMarketData.ContinueWith(
(t) => RunModel(t.Result));
Task<StockAnalysisCollection> analyzeHistoricalData =
normalizeHistoricalData.ContinueWith(
(t) => AnalyzeData(t.Result));
Task<MarketModel> modelHistoricalData =
analyzeHistoricalData.ContinueWith(
(t) => RunModel(t.Result));
Task<MarketRecommendation> compareModels =
factory.ContinueWhenAll<MarketModel, MarketRecommendation>(
new[] { modelMarketData, modelHistoricalData },
(tasks) => CompareModels(from t in tasks select t.Result));
Task errorHandler = CreateErrorHandler(loadNyseData,
loadNasdaqData, loadFedHistoricalData,
mergeMarketData, normalizeHistoricalData,
normalizeMarketData, analyzeHistoricalData,
analyzeMarketData, modelHistoricalData,
modelMarketData, compareModels);
return new AnalysisTasks()
{
LoadNyseData = loadNyseData,
LoadNasdaqData = loadNasdaqData,
MergeMarketData = mergeMarketData,
NormalizeMarketData = normalizeMarketData,
LoadFedHistoricalData = loadFedHistoricalData,
NormalizeHistoricalData = normalizeHistoricalData,
AnalyzeMarketData = analyzeMarketData,
AnalyzeHistoricalData = analyzeHistoricalData,
ModelMarketData = modelMarketData,
ModelHistoricalData = modelHistoricalData,
CompareModels = compareModels,
ErrorHandler = errorHandler
};
}


The parallel version, provided by the DoAnalysisParallel method, is similar to the sequential version, except that the synchronous method calls have been replaced with futures and continuation tasks. The method returns an AnalysisTasks object that contains the tasks associated with each step of the calculation. The DoAnalysisParallel method returns immediately, leaving the tasks running. The next sections describe how each of the tasks is created.

2.1. Loading External Data

The methods that gather the external data from the network are long-running, I/O intensive operations. Unlike the other steps, they are not particularly processor intensive, but they may take a relatively long time to complete. Most of their time is spent waiting for I/O operations to finish. You create these tasks with a factory object, and you use an argument to specify that the tasks are of long duration. This temporarily increases the degree of concurrency that is allowed by the system. The following code shows how to load the external data.

Task<StockDataCollection> loadNyseData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNyseData(),
TaskCreationOptions.LongRunning);
Task<StockDataCollection> loadNasdaqData =
Task<StockDataCollection>.Factory.StartNew(
() => LoadNasdaqData(),
TaskCreationOptions.LongRunning);

Note that the factory creates futures that return values of type Stock DataCollection. The TaskCreationOptions.LongRunning enumerated value tells the task library that you want more concurrency. To prevent underutilization of processor resources, the task library may choose to run tasks like these in additional threads.


Note:

Use “long running” tasks in cases where you want additional concurrency, such as when there are long-running I/O operations.


2.2. Merging

The merge operation takes inputs from both the loadNyseData and the loadNasdaqData tasks. It’s a continuation task that depends on two antecedent tasks, as shown in the following code.

Task<StockDataCollection> mergeMarketData =
factory.ContinueWhenAll<StockDataCollection,
StockDataCollection>(
new[] { loadNyseData, loadNasdaqData },
(tasks) => MergeMarketData(
from t in tasks select t.Result));

After the loadNyseData and loadNasdaqData tasks complete, the MergeMarketData method given as an argument is invoked. At that point, the tasks parameter will be an array of antecedent tasks, which are the loadNyseData and loadNasdaqData tasks.

The MergeMarketData method takes an array of StockData Collection objects as its input. The LINQ expression from t in tasks select t.Result maps the input array of futures into a collection of StockDataCollection objects by getting the Result property of each future.

2.3. Normalizing

After the market data is merged, it undergoes a normalization step.

Task<StockDataCollection>> normalizeMarketData =
mergeMarketData.ContinueWith(
(t) => NormalizeData(t.Result));

The ContinueWith method creates a continuation task with a single antecedent. The continuation task gets the result value from the task referenced by the mergeMarketData variable and invokes the NormalizeData method.

2.4. Analysis and Model Creation

After the market data is normalized, the application performs an analysis step. This takes an object of type StockAnalysisCollection as input and returns an object of type MarketAnalysis, as shown in the following code.

Task<StockAnalysisCollection> analyzeMarketData =
normalizeMarketData.ContinueWith(
(t) => AnalyzeData(t.Result));
Task<MarketModel> modelMarketData =
analyzeMarketData.ContinueWith(
(t) => RunModel(t.Result));

2.5. Processing Historical Data

The application also creates a model of historical data. The steps that create the tasks are similar to those for the current market data. However, because these steps are performed by tasks, they may be run in parallel if the hardware resources allow it.

2.6. Comparing Models

Here is the code that compares the two models.

Task<MarketRecommendation> compareModels =
factory.ContinueWhenAll<MarketModel, MarketRecommendation>(
new[] { modelMarketData, modelHistoricalData },
(tasks) => CompareModels(
from t in tasks select t.Result));

The “compare models” step compares the current and historical market models and produces the fnal result.

.3. View and View Model

The Adatum Dashboard is a GUI-based application that also uses the Model-View-ViewModel (MVVM) pattern. It breaks the parallel computation into subtasks whose status can be independently seen from the user interface.

Other  
  •  Parallel Programming with Microsoft .Net : Futures - The Basics
  •  Using Non-Windows Systems to Access Exchange Server 2010 : Outlook Express
  •  Using Non-Windows Systems to Access Exchange Server 2010 : Understanding Non-Windows–Based Mail Client Options
  •  Deploying the Client for Microsoft Exchange Server 2010 : Deploying with Microsoft System Center Configuration Manager 2007
  •  Deploying the Client for Microsoft Exchange Server 2010 : Pushing Outlook Client Software with Group Policies
  •  Deploying the Client for Microsoft Exchange Server 2010 : Installing the Outlook Client for Exchange Server
  •  Deploying the Client for Microsoft Exchange Server 2010 : Preparing the Deployment
  •  Parallel Programming with Microsoft .Net : Parallel Aggregation - Design Notes
  •  Parallel Programming with Microsoft .Net : Parallel Aggregation - Variations
  •  Leveraging and Optimizing Search in SharePoint 2010 : Uninstalling FAST Search Server 2010 for SharePoint
  •  Leveraging and Optimizing Search in SharePoint 2010 : Customizing the FAST Search User Interface
  •  Deploying the Client for Microsoft Exchange Server 2010 : Planning Considerations and Best Practices
  •  Deploying the Client for Microsoft Exchange Server 2010 : Understanding Deployment Options
  •  Deploying the Client for Microsoft Exchange Server 2010 : Outlook 2007 Auto Account Setup
  •  Leveraging and Optimizing Search in SharePoint 2010 : Deploying FAST Search Service Applications
  •  Leveraging and Optimizing Search in SharePoint 2010 : Customizing the Search User Interface
  •  Leveraging and Optimizing Search in SharePoint 2010 : Keywords and Best Bets
  •  Leveraging and Optimizing Search in SharePoint 2010 : Federating Search
  •  Leveraging and Optimizing Search in SharePoint 2010 : Search Scopes
  •  Active Directory Domain Services 2008 : View Cached Credentials on a Read-Only Domain Controller
  •  
    Most View
    Turn Your Smartphone Into A Safe
    HTC Desire SV Review – A Shining Mid-Range Dual-SIM Smartphone
    Plantronics Marque 2 M165 - Bluetooth Headset
    Sony KDL-40HX853 - Superb DVD Upscaling
    Dell Inspiron 17R Special Edition - Battery Is A Disappointment
    G-360 And G-550 Power Supply Devices Review (Part 3)
    Photo Editors: From Professional RAW Tools To Simple Library Management (Part 3)
    Introduction to Windows 8 Administration : Windows 8 Architecture
    Six Ways To Pump Up The Volume
    iOS 6 Maps - What Went Wrong?
    Top 10
    ASUS Tytan – Tytanfall (Part 2)
    ASUS Tytan – Tytanfall (Part 1)
    Corsair Obsidian Series 350D Case
    Corsair Vengeance K70 Mechanical Gaming Keyboard
    CPU Buyer’s Guide - Hunting For Brains (Part 2)
    CPU Buyer’s Guide - Hunting For Brains (Part 1) : AMD A10-6800K, AMD FX-8350, AMD’s new FX-9590
    Crucial Ballistix Sport 16GB Kit - High-Performance Memory
    Epson WorkForce M1 O5 Printer - No Color Here
    EVGA Z77 Stinger - A New Mini-ITX Motherboard
    Gigabyte GeForce GTX Titan Graphics Card