【发布时间】:2016-11-27 23:40:12
【问题描述】:
我在确定如何检测循环 TPL 数据流中的完成时遇到问题。
我在数据流的一部分中有一个反馈循环,它正在向远程服务器发出 GET 请求并处理数据响应(用更多数据流转换这些响应,然后提交结果)。
数据源将其结果拆分为包含 1000 条记录的页面,并且不会告诉我它有多少页可供我使用。我必须继续阅读,直到获得不到一整页的数据。
通常页数为 1,经常达到 10,有时我们有 1000 页。
一开始我有很多请求要获取。
我希望能够使用一个线程池来处理这个问题,这一切都很好,我可以将多个数据请求排队并同时请求它们。如果我偶然发现一个需要获取大量页面的实例,我想为此使用我的所有线程。我不想在其他线程完成时留下一个线程。
我遇到的问题是当我将此逻辑放入数据流中时,例如:
//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));
//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
await fetch.SendAsync(QueueAnotherRequest(req));
return resp;
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));
request.LinkTo(fetch);
fetch.LinkTo(commit);
//when are we complete?
QueueRequests 产生一个IEnumerable<DataRequest>。我一次将接下来的 N 个页面请求排队,接受这意味着我发送的呼叫比我需要的要多。 DataRequest 实例共享一个 LastPage 计数器,以避免不必要地发出我们知道在最后一页之后的请求。这一切都很好。
问题:
如果我通过将更多请求反馈到 fetch 的输入缓冲区来循环,如我在本示例中所示,那么我在如何发出信号(甚至检测)完成方面存在问题。我无法在从请求中获取时设置完成,因为一旦设置完成,我就无法再反馈了。
我可以在 fetch 时监控输入和输出缓冲区是否为空,但我认为当我设置完成时,我可能会冒 fetch 仍然忙于请求的风险,从而防止对其他页面的请求排队。
我可以通过某种方式知道 fetch 正忙(有输入或正忙于处理输入)。
我是否缺少一种明显/直接的方法来解决这个问题?
我可以在 fetch 中循环,而不是让更多请求排队。问题是我希望能够使用设置的最大线程数来限制我对远程服务器所做的事情。块内的并行循环能否与块本身共享一个调度程序,并通过调度程序控制生成的线程数?
我可以为 fetch 创建一个自定义转换块来处理完成信号。对于这样一个简单的场景,似乎需要做很多工作。
非常感谢您提供的任何帮助!
【问题讨论】:
-
你现在是不是所有请求都在第一个块中生成的那一刻?
-
是的,要启动管道,我打电话给
foreach (var c in todolist) { request.Post(c); };。然后我可以打电话给request.Complete();,因为我不会再添加任何请求了。 -
@ajk,如果这就是你正在做的事情,你为什么不在所有的块链接上简单地使用
a.LinkTo(b, new DataflowLinkOptions { PropagateCompletion = true })?然后调用request.Complete()将导致commit.Completion在所有项目自然通过管道的所有阶段后转换到完成状态。 -
@KirillShlenskiy 。是的,那很好,但是在 fetch 处于完成状态之后,它不会再接受任何消息,这就是 fetch 本身正在产生的。所以这行 await fetch.SendAsync 没有成功。
标签: c# multithreading task-parallel-library tpl-dataflow