【问题标题】:Parallel.ForEach using Thread.Sleep equivalentParallel.ForEach 使用 Thread.Sleep 等效
【发布时间】:2014-08-19 04:41:25
【问题描述】:

所以情况如下:我需要调用一个开始搜索的网站。此搜索持续了未知的时间,我知道搜索是否完成的唯一方法是定期查询网站以查看其上是否有“下载数据”链接(它在 javascript 上使用了一些奇怪的 ajax 调用计时器来检查后端并更新页面,我认为)。

所以诀窍是:我需要搜索数百个项目,一次一个。所以我有一些看起来有点像这样的代码:

var items = getItems();
Parallel.ForEach(items, item =>
{
   startSearch(item);
   var finished = isSearchFinished(item);
   while(finished == false)
   {
      finished = isSearchFinished(item); //<--- How do I delay this action 30 Secs?
   }
   downloadData(item);
}

现在,显然这不是真正的代码,因为可能有一些事情导致isSearchFinished 始终为false

除了明显的无限循环危险之外,我如何正确地阻止 isSearchFinished() 一遍又一遍地调用,而是每隔 30 秒或 1 分钟调用一次?

我知道Thread.Sleep() 不是正确的解决方案,我认为该解决方案可以通过使用Threading.Timer() 来完成,但我对它不是很熟悉,而且线程选项太多了,我只是不确定使用哪个。

【问题讨论】:

  • 我认为你不应该使用Parallel.ForEach
  • 好的,我应该使用什么?我非常愿意接受建议。
  • 所以只需使用 foreach 循环,例如,任务工厂?
  • 是的,对于在特定时间长度后安排的定期操作,Timer 正是您想要的。
  • 您对使用 Thread.Sleep 的担忧是您将固定其中一个线程池线程?使用 Task.Delay 而不是 Tasks 的异步方法怎么样?

标签: c# multithreading task-parallel-library parallel.foreach


【解决方案1】:

使用任务和async/await 很容易实现,正如@KevinS 在 cmets 中所指出的那样:

async Task<ItemData> ProcessItemAsync(Item item)
{
    while (true)
    {
        if (await isSearchFinishedAsync(item))
            break;
        await Task.Delay(30 * 1000);
    }
    return await downloadDataAsync(item);
}

// ...

var items = getItems();
var tasks = items.Select(i => ProcessItemAsync(i)).ToArray();
await Task.WhenAll(tasks);
var data = tasks.Select(t = > t.Result);

这样,您就不会因为大部分 I/O 绑定的网络操作而徒劳地阻塞 ThreadPool 线程。如果您不熟悉 async/awaitasync-await 标签 wiki 可能是一个不错的起点。

我假设您可以将同步方法 isSearchFinisheddownloadData 转换为异步版本,使用类似 HttpClient 的非阻塞 HTTP 请求并返回 Task&lt;&gt;。如果你不能这样做,你仍然可以简单地用Task.Run 包装它们,如await Task.Run(() =&gt; isSearchFinished(item))await Task.Run(() =&gt; downloadData(item))。通常不建议这样做,但由于您有数百个项目,在这种情况下,它会给您提供比Parallel.ForEach 更好的并发水平,因为您不会阻塞池线程 30 秒,这要归功于异步 @ 987654335@.

【讨论】:

  • FWIW,Task.Delay 只是在封面下使用Timer
  • 谢谢彼得,我现在正在尝试实现这一点。我需要让当前的“用户”保持登录状态。这个系统使用 cookie 作为令牌。我应该将整个 HttpClient 对象传递给不同的方法,还是建议只传递 cookie 容器?
  • @Ryan,HttpClientCoockieContainer 均未记录为并发调用的线程安全。您的应用程序的执行环境是什么(GUI、控制台、Windows 服务、WCF、ASP.NET 等)?
  • GUI - Windows 窗体。抱歉,我发帖时显然没有考虑登录,相信我可以为每个并行线程使用 WebClient 的单个实例,消除传递 cookie 等的需要。我现在意识到有很多事情是错误的用这种方法。流程应该是这样的:登录,保存cookie以供后续调用,启动搜索,定期检查搜索是否完成,下载结果。每次调用时,令牌(作为 cookie)都会发生变化。
  • 哇,太棒了!所以我可以登录一次,然后使用该登录令牌进行所有处理。谢谢!
【解决方案2】:

您还可以使用TaskCompletionSourceThreading.Timer 编写一个通用函数来返回一个Task,一旦指定的重试函数成功,该函数就会完成。

public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval)
{
    return RetryAsync(retryFunc, retryInterval, CancellationToken.None);
}

public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();

    cancellationToken.Register(() => tcs.TrySetCanceled());

    var timer = new Timer((state) =>
    {
        var taskCompletionSource = (TaskCompletionSource<object>) state;

        try
        {                   
            if (retryFunc())
            {
                taskCompletionSource.TrySetResult(null);
            }
        }
        catch (Exception ex)
        {
            taskCompletionSource.TrySetException(ex);
        }
    }, tcs, TimeSpan.FromMilliseconds(0), retryInterval);

    // Once the task is complete, dispose of the timer so it doesn't keep firing. Also captures the timer
    // in a closure so it does not get disposed.
    tcs.Task.ContinueWith(t => timer.Dispose(),
                          CancellationToken.None,
                          TaskContinuationOptions.ExecuteSynchronously,
                          TaskScheduler.Default);

    return tcs.Task;
}

然后您可以像这样使用RetryAsync

var searchTasks = new List<Task>();

searchTasks.AddRange(items.Select(
        downloadItem => RetryAsync( () => isSearchFinished(downloadItem),  TimeSpan.FromSeconds(2))  // retry timout
        .ContinueWith(t => downloadData(downloadItem), 
                      CancellationToken.None, 
                      TaskContinuationOptions.OnlyOnRanToCompletion, 
                      TaskScheduler.Default)));

await Task.WhenAll(searchTasks.ToArray());

ContinueWith 部分指定任务成功完成后您要执行的操作。在这种情况下,它将在线程池线程上运行您的 downloadData 方法,因为我们指定了 TaskScheduler.Default,并且仅当任务运行完成时才会执行继续,即它没有被取消并且没有引发异常。

【讨论】:

  • 谢谢内德。我实际上将尝试使用这种方法和 Noseratio 编写相同的代码,以便我可以更加熟悉 C# 中的异步概念。特别是,我喜欢使用 ContinueWith。
猜你喜欢
  • 2015-05-28
  • 2010-10-02
  • 2018-07-13
  • 2013-10-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多