【问题标题】:Completion in TPL Dataflow Loops在 TPL 数据流循环中完成
【发布时间】:2016-11-27 23:40:12
【问题描述】:

我在确定如何检测循环 TPL 数据流中的完成时遇到问题。

我在数据流的一部分中有一个反馈循环,它正在向远程服务器发出 GET 请求并处理数据响应(用更多数据流转换这些响应,然后提交结果)。

数据源将其结果拆分为包含 1000 条记录的页面,并且不会告诉我它有多少页可供我使用。我必须继续阅读,直到获得不到一整页的数据。

通常页数为 1,经常达到 10,有时我们有 1000 页。

一开始我有很多请求要获取。
我希望能够使用一个线程池来处理这个问题,这一切都很好,我可以将多个数据请求排队并同时请求它们。如果我偶然发现一个需要获取大量页面的实例,我想为此使用我的所有线程。我不想在其他线程完成时留下一个线程。

我遇到的问题是当我将此逻辑放入数据流中时,例如:

//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));

//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
    var resp = await Fetch(req);

    if (resp.Results.Count == 1000)
        await fetch.SendAsync(QueueAnotherRequest(req));

    return resp;
}
, new ExecutionDataflowBlockOptions {  MaxDegreeOfParallelism = 10 });

//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));

request.LinkTo(fetch);
fetch.LinkTo(commit);

//when are we complete?

QueueRequests 产生一个IEnumerable&lt;DataRequest&gt;。我一次将接下来的 N 个页面请求排队,接受这意味着我发送的呼叫比我需要的要多。 DataRequest 实例共享一个 LastPage 计数器,以避免不必要地发出我们知道在最后一页之后的请求。这一切都很好。

问题:
如果我通过将更多请求反馈到 fetch 的输入缓冲区来循环,如我在本示例中所示,那么我在如何发出信号(甚至检测)完成方面存在问题。我无法在从请求中获取时设置完成,因为一旦设置完成,我就无法再反馈了。

我可以在 fetch 时监控输入和输出缓冲区是否为空,但我认为当我设置完成时,我可能会冒 fetch 仍然忙于请求的风险,从而防止对其他页面的请求排队。

我可以通过某种方式知道 fetch 正忙(有输入或正忙于处理输入)。

我是否缺少一种明显/直接的方法来解决这个问题?

  • 我可以在 fetch 中循环,而不是让更多请求排队。问题是我希望能够使用设置的最大线程数来限制我对远程服务器所做的事情。块内的并行循环能否与块本身共享一个调度程序,并通过调度程序控制生成的线程数?

  • 我可以为 fetch 创建一个自定义转换块来处理完成信号。对于这样一个简单的场景,似乎需要做很多工作。

非常感谢您提供的任何帮助!

【问题讨论】:

  • 你现在是不是所有请求都在第一个块中生成的那一刻?
  • 是的,要启动管道,我打电话给foreach (var c in todolist) { request.Post(c); };。然后我可以打电话给request.Complete();,因为我不会再添加任何请求了。
  • @ajk,如果这就是你正在做的事情,你为什么不在所有的块链接上简单地使用a.LinkTo(b, new DataflowLinkOptions { PropagateCompletion = true })?然后调用request.Complete() 将导致commit.Completion 在所有项目自然通过管道的所有阶段后转换到完成状态。
  • @KirillShlenskiy 。是的,那很好,但是在 fetch 处于完成状态之后,它不会再接受任何消息,这就是 fetch 本身正在产生的。所以这行 await fetch.SendAsync 没有成功。

标签: c# multithreading task-parallel-library tpl-dataflow


【解决方案1】:

在 TPL 数据流中,您可以在 link the blocksDataflowLinkOptions 中指定 propagation of completion of the block

request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true });
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true });

之后,您只需为 request 块调用 Complete() 方法,就完成了!

// the completion will be propagated to all the blocks
request.Complete();

你应该使用的最后一个是Completion最后一个块的任务属性:

commit.Completion.ContinueWith(t =>
    {
        /* check the status of the task and correctness of the requests handling */
    });

【讨论】:

  • 嗨@VMAtm,是的,正如上面的 cmets 中所讨论的,这是可以理解的。然而,一旦完成传播到 Fetch,Fetch 就不能再向其输入缓冲区发布更多消息。如果 Fetch 在收到响应时发现有更多可用数据,则 Fetch 会将消息反馈给自己。当在 fetch 上设置完成时,不再允许此反馈方法。
  • 那么您只需将完成从fetch 传播到commit 并使用request.Completion.ContinueWith 循环来检查fetch 状态,就像您在回答中所做的那样
  • 非常感谢。我不确定是否有更好的方法来知道 fetch 是否完成,但如果没有,我可以忍受!
【解决方案2】:

现在我已经在 fetch 块中添加了一个简单的忙状态计数器:-

int fetch_busy = 0;

TransformBlock<DataRequest, DataResponse>  fetch_activity=null;
fetch = new TransformBlock<DataRequest, ActivityResponse>(async req => 
    {
        try
        {
            Interlocked.Increment(ref fetch_busy);
            var resp = await Fetch(req);

            if (resp.Results.Count == 1000)
            {
                await fetch.SendAsync( QueueAnotherRequest(req) );
            }

            Interlocked.Decrement(ref fetch_busy);
            return resp;
        }
        catch (Exception ex)
        {
            Interlocked.Decrement(ref fetch_busy);
            throw ex;
        }
    }
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

然后我用它来表示完成如下:-

request.Completion.ContinueWith(async _ =>
    {
        while ( fetch.InputCount > 0 || fetch_busy > 0 )
        {
            await Task.Delay(100);
        }

        fetch.Complete();
    });

这看起来不是很优雅,但我认为应该可以。

【讨论】:

  • 我的理解是SendAsync 在验证下一个块很乐意接受新项目后立即返回。因此,await fetch.SendAsync 有可能在“内部”转换有机会再次增加 fetch_busy 之前完成(随后减少 fetch_busy)。在此期间,fetch 块可能会被您的延续标记为完成(如果 fetch_busyfetch.InputCount 都恰好为零)。如果运行中的内部 Fetch 任务然后产生 1000 个项目并尝试另一个 SendAsync,它将悄悄地失败。
  • 这显然是一个相当牵强但并非不可想象的场景,所以如果await fetch.SendAsync 返回false,也许你应该抛出。还要记住:ContinueWith 使用异步 lambda 作为参数返回 Task&lt;Task&gt;(如果您决定对结果做任何事情,这可能会导致意外)。
  • @KirillShlenskiy 谢谢你,是的,我会调查的。我试图通过在ContinueWith 中检查fetch.InputCount &gt; 0 来缓解这种情况。你是说await fetch.SendAsync 可以在新排队的请求显示在fetch.InputCount 之前返回?
  • 不,这不太可能。但是,fetch.InputCount/fetch_busy 检查可以在内部 fetch 抓取要处理的项目之后立即发生(从而减少 InputCount - 可能为零),但在它有机会完成任何工作之前(即递增fetch_busy).
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-02
  • 1970-01-01
  • 1970-01-01
  • 2015-09-05
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多