【问题标题】:Reopen TPL Dataflow input after marking it complete标记完成后重新打开 TPL 数据流输入
【发布时间】:2019-01-15 18:39:05
【问题描述】:

我正在尝试创建一个处理管道服务,用户可以将项目放入其中并等待结果完成处理。我的想法是使用 DI 让它能够注入。

我面临的问题是,在处理完第一组数据并将输入块标记为完成后,当我尝试处理另一组数据时它仍然关闭。有没有办法重新打开管道以允许再次处理数据?

我还在 TPL 数据流上使用名为 DataflowEx 的库。

   public interface IPipelineService
   {
        Task FillPipeline(object inputObj);

        Task WaitForResults();

        Task<List<object>> GetResults();

        Task FlushPipeline();

        Task Complete();
   }

   public class Pipeline : Dataflow<object>, IPipelineService
   {
        private TransformBlock<object, object> _inputBlock;
        private ActionBlock<object> _resultBlock;

        private List<object> _results { get; set; }

        public Pipeline() : base(DataflowOptions.Default)
        {
            _results = new List<object>();

            _inputBlock = new TransformBlock<object, object>(obj => Processing.Processing.ReceiveOrder(obj));
            _resultBlock = new ActionBlock<object>(obj => _results.Add(Processing.Processing.ReturnProcessedOrder(obj)));

            _inputBlock.LinkTo(_resultBlock, new DataflowLinkOptions() { PropagateCompletion = true });

            RegisterChild(_inputBlock);
            RegisterChild(_resultBlock);
        }

        public Task FillPipeline(object inputObj)
        {
            //InputBlock.Post(inputObj);
            return Task.CompletedTask;
        }

        public async Task WaitForResults()
        {
            await this.CompletionTask;
        }

        public Task<List<object>> GetResults()
        {
            return Task.FromResult(_results);
        }

        public Task FlushPipeline()
        {
            _results = new List<object>();
            return Task.CompletedTask;
        }

        Task IPipelineService.Complete()
        {
            InputBlock.Complete();
            return Task.CompletedTask;
        }

        public override ITargetBlock<object> InputBlock { get { return _inputBlock; } }

        public object Result { get { return _results; } }
    }

这是我目前用来测试这个想法的基本示例。

这就是我希望能够使用它并能够在它完成第一组处理后将物品送入其中的方式。

await _pipelineService.FillPipeline(new GenerateOrder(OrderType.HomeLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.OtherLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.PersonalLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.CarLoan).order);
await _pipelineService.Complete();
await _pipelineService.WaitForResults();

【问题讨论】:

  • 问题在于这一行InputBlock.Complete();,一旦该块完成,它将不接受新输入,因此您必须创建一个新块将其重新链接到管道的开头。
  • @JSteward 让它像我预想的那样工作需要为放置在其中的每组数据生成新的输入块?这是有道理的,因为似乎一旦一个块完成,就没有办法将其恢复。
  • 不幸的是没有,如果你传播完成,不要忘记替换下游块。
  • @JSteward 好的,感谢您的帮助。我认为这为目前的管道服务理念敲响了警钟。

标签: c# task-parallel-library tpl-dataflow


【解决方案1】:

您无法重新启动已完成的数据流集 - 我只是重置我的对象以重新开始(在这种情况下,我在 CompleteAsync() 中调用 ResetDataFlow)

public class DownloadConnector
{
    public DownloadDataFlow DataFlow { get; set; }

    public DownloadConnector(int maxDop)
    {
        DataFlow = new DownloadDataFlow(maxDop);
    }

    public async Task SendAsync(DownloadItem item)
    {
        await DataFlow.BufferBlock.SendAsync(item);
    }

    public async Task CompleteAsync()
    {
        DataFlow.BufferBlock.Complete();
        await DataFlow.ActionBlock.Completion;
        DataFlow.ResetDataFlow();
    }
}

public class DownloadDataFlow
{
    public BufferBlock<DownloadItem> BufferBlock { get; set; }
    public TransformBlock<DownloadItem, DownloadItem> TransformBlock { get; set; }
    public ActionBlock<DownloadItem> ActionBlock { get; set; }
    public int MaxDop { get; set; }

    public DownloadDataFlow(int maxDop)
    {
        MaxDop = maxDop;
        ResetDataFlow();
    }

    public DownloadDataFlow ResetDataFlow()
    { 
        BufferBlock = new BufferBlock<DownloadItem>();
        TransformBlock = new TransformBlock<DownloadItem, DownloadItem>(DownloadAsync);
        ActionBlock = new ActionBlock<DownloadItem>(OnCompletion, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxDop });
        BufferBlock.LinkTo(TransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
        TransformBlock.LinkTo(ActionBlock, new DataflowLinkOptions { PropagateCompletion = true });

        return this;
    }

    public async Task DownloadAsync(DownloadItem item)
    {
        ...
    }

    public async Task OnCompletion(DownloadItem item)
    {
        ...
    }
}

public class DownloadItem
{
    ...
}

代码运行使用:

var connector = new DownloadConnector(10);
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.CompleteAsync();

await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.CompleteAsync();

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-02
    • 1970-01-01
    • 1970-01-01
    • 2019-09-02
    • 2023-04-09
    相关资源
    最近更新 更多