【发布时间】:2019-01-15 18:39:05
【问题描述】:
我正在尝试创建一个处理管道服务,用户可以将项目放入其中并等待结果完成处理。我的想法是使用 DI 让它能够注入。
我面临的问题是,在处理完第一组数据并将输入块标记为完成后,当我尝试处理另一组数据时它仍然关闭。有没有办法重新打开管道以允许再次处理数据?
我还在 TPL 数据流上使用名为 DataflowEx 的库。
public interface IPipelineService
{
Task FillPipeline(object inputObj);
Task WaitForResults();
Task<List<object>> GetResults();
Task FlushPipeline();
Task Complete();
}
public class Pipeline : Dataflow<object>, IPipelineService
{
private TransformBlock<object, object> _inputBlock;
private ActionBlock<object> _resultBlock;
private List<object> _results { get; set; }
public Pipeline() : base(DataflowOptions.Default)
{
_results = new List<object>();
_inputBlock = new TransformBlock<object, object>(obj => Processing.Processing.ReceiveOrder(obj));
_resultBlock = new ActionBlock<object>(obj => _results.Add(Processing.Processing.ReturnProcessedOrder(obj)));
_inputBlock.LinkTo(_resultBlock, new DataflowLinkOptions() { PropagateCompletion = true });
RegisterChild(_inputBlock);
RegisterChild(_resultBlock);
}
public Task FillPipeline(object inputObj)
{
//InputBlock.Post(inputObj);
return Task.CompletedTask;
}
public async Task WaitForResults()
{
await this.CompletionTask;
}
public Task<List<object>> GetResults()
{
return Task.FromResult(_results);
}
public Task FlushPipeline()
{
_results = new List<object>();
return Task.CompletedTask;
}
Task IPipelineService.Complete()
{
InputBlock.Complete();
return Task.CompletedTask;
}
public override ITargetBlock<object> InputBlock { get { return _inputBlock; } }
public object Result { get { return _results; } }
}
这是我目前用来测试这个想法的基本示例。
这就是我希望能够使用它并能够在它完成第一组处理后将物品送入其中的方式。
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.HomeLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.OtherLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.PersonalLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.CarLoan).order);
await _pipelineService.Complete();
await _pipelineService.WaitForResults();
【问题讨论】:
-
问题在于这一行
InputBlock.Complete();,一旦该块完成,它将不接受新输入,因此您必须创建一个新块将其重新链接到管道的开头。 -
@JSteward 让它像我预想的那样工作需要为放置在其中的每组数据生成新的输入块?这是有道理的,因为似乎一旦一个块完成,就没有办法将其恢复。
-
不幸的是没有,如果你传播完成,不要忘记替换下游块。
-
@JSteward 好的,感谢您的帮助。我认为这为目前的管道服务理念敲响了警钟。
标签: c# task-parallel-library tpl-dataflow