Net Fx 4.0 [best] -

// Create consumer tasks for each pipeline stage var stageTasks = new List<Task>(); for (int i = 0; i < _stages.Count; i++) { var stageIndex = i; var stage = _stages[stageIndex]; var nextQueue = (stageIndex < _stages.Count - 1) ? new BlockingCollection<WorkItem>() : null; var stageTask = Task.Run(async () => { var sourceQueue = (stageIndex == 0) ? inputQueue : GetQueueForStage(stageIndex - 1); foreach (var item in sourceQueue.GetConsumingEnumerable()) { cancellationToken.ThrowIfCancellationRequested(); progress?.Report($"Processing item {item.Id} in {stage.StageName}"); var processedItem = await stage.ProcessAsync(item, cancellationToken); if (nextQueue != null) { nextQueue.Add(processedItem, cancellationToken); } else { results.Add(processedItem); } } nextQueue?.CompleteAdding(); }, cancellationToken); stageTasks.Add(stageTask); StoreQueueForStage(stageIndex, nextQueue); } await Task.WhenAll(stageTasks.ToArray()); await producerTask; return results.ToList(); } private Dictionary<int, BlockingCollection<WorkItem>> _queues = new Dictionary<int, BlockingCollection<WorkItem>>(); private void StoreQueueForStage(int stageIndex, BlockingCollection<WorkItem> queue) { if (queue != null) _queues[stageIndex] = queue; } private BlockingCollection<WorkItem> GetQueueForStage(int stageIndex) { return _queues.ContainsKey(stageIndex) ? _queues[stageIndex] : null; } }

// Stage 1: Data Validation public class ValidationStage : IPipelineStage<WorkItem, WorkItem> { public string StageName => "Validation"; net fx 4.0

// Stage 3: Data Enrichment public class EnrichmentStage : IPipelineStage<WorkItem, WorkItem> { public string StageName => "Enrichment"; // Create consumer tasks for each pipeline stage

Ready to Sell Your Aircraft?

List your airplane on AircraftForSale.com and reach qualified buyers.

List Your Aircraft
AircraftForSale Logo | FLYING Logo
Pilot in aircraft
Sign-up for newsletters & special offers!

Get the latest stories & special offers delivered directly to your inbox.

SUBSCRIBE