【问题标题】:Parallel ForEach wait 500 ms before spawningParallel ForEach 在生成前等待 500 毫秒
【发布时间】:2013-07-13 09:18:06
【问题描述】:

我有这种情况:

var tasks = new List<ITask> ...
Parallel.ForEach(tasks, currentTask => currentTask.Execute() );

是否可以指示 PLinq 在下一个线程产生之前等待 500 毫秒?

System.Threading.Thread.Sleep(5000);

【问题讨论】:

  • 你想在这里达到什么目的?
  • 我认为Parallel.ForEach(tasks, currentTask =&gt; { Thread.Sleep(5000); currentTask.Execute(); }); 可以解决问题,但我很想知道您为什么要这样做 - 听起来像是一种解决方法?
  • 每个需要一段时间才能执行的任务从另一个资源中获取其数据,该资源只能每 0.5 秒命中一次。我想我可以将获取数据和执行任务分开......
  • 不,这可能随时失败,不要假设它会在 500 毫秒内完成,你可以在这里使用WaitHandles
  • 分离是这里的首选。

标签: c# .net plinq


【解决方案1】:

您可以改用Enumerable.Aggregate

var task = tasks.Aggregate((t1, t2) =>
                                t1.ContinueWith(async _ =>
                                    { Thread.Sleep(500); return t2.Result; }));

如果您不希望将任务链接起来,那么假设任务按延迟顺序排列,Select 也会过载。

var tasks = Enumerable
              .Range(1, 10)
              .Select(x => Task.Run(() => x * 2))
              .Select((x, i) => Task.Delay(TimeSpan.FromMilliseconds(i * 500))
                                    .ContinueWith(_ => x.Result));

foreach(var result in tasks.Select(x => x.Result))
{
    Console.WriteLine(result);
}

从 cmets 更好的选择是保护资源而不是使用时间延迟。

static object Locker = new object();

static int GetResultFromResource(int arg)
{
    lock(Locker)
    {
        Thread.Sleep(500);
        return arg * 2;
    }
}

var tasks = Enumerable
          .Range(1, 10)
          .Select(x => Task.Run(() => GetResultFromResource(x)));

foreach(var result in tasks.Select(x => x.Result))
{
    Console.WriteLine(result);
}

【讨论】:

  • 有点回答问题但删除了所有并发性。
  • 我添加了另一个将同时运行的方法。
【解决方案2】:

您使用Parallel.Foreach 完全错误,您应该创建一个特殊的枚举器,将其速率限制为每 500 毫秒获取一次数据。

由于您未提供任何详细信息,我对您的 DTO 的工作方式做了一些假设。

private IEnumerator<SomeResource> GetRateLimitedResource()
{
    SomeResource someResource = null;
    do
    {
        someResource = _remoteProvider.GetData();

        if(someResource != null)
        {
             yield return someResource;
             Thread.Sleep(500);
        }
    } while (someResource != null);
}

这就是你的并行应该是什么样子

Parallel.ForEach(GetRateLimitedResource(), SomeFunctionToProcessSomeResource);

【讨论】:

  • 好主意,因为它简单而强大。
  • 这会等待 500 毫秒,然后才能检索到第一个项目。这是故意的吗?
  • @svick 我不得不把睡眠放在某个地方,我在数据抓取之后移到了它,这样它就会进入睡眠状态。可以通过获取DateTime.Now 并将其保存在局部变量中来进一步改进代码,然后当下一次迭代发生时,您可以检查 500 毫秒是否已经过去,并且只休眠所需的时间。
  • 这可能不会按预期工作,因为 AFAIK Parallel.ForEach 默认使用块分区。这意味着一次将逐步枚举越来越多的元素。当需要减少同步开销时,块分区是有意义的,但在这种情况下,开销与施加的人为延迟完全相形见绌。所以禁用它是有意义的,将GetRateLimitedResource() 替换为Partitioner.Create(GetRateLimitedResource(), EnumerablePartitionerOptions.NoBuffering)
【解决方案3】:

在这种情况下,带有BlockingCollection&lt;T&gt; 的生产者-消费者模式怎么样?

var tasks = new BlockingCollection<ITask>();

// add tasks, if this is an expensive process, put it out onto a Task
// tasks.Add(x);

// we're done producin' (allows GetConsumingEnumerable to finish)
tasks.CompleteAdding();

RunTasks(tasks);

使用单个消费者线程:

static void RunTasks(BlockingCollection<ITask> tasks)
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        task.Execute();

        // this may not be as accurate as you would like
        Thread.Sleep(500);
    }
}

如果您可以访问 .Net 4.5,则可以使用 Task.Delay

static void RunTasks(BlockingCollection<ITask> tasks)
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        Task.Delay(500)
            .ContinueWith(() => task.Execute())
            .Wait();
    }
}

【讨论】:

    【解决方案4】:

    已经有一些很好的建议。我同意其他人的观点,即您使用 PLINQ 的方式并非有意使用。

    我的建议是使用System.Threading.Timer。这可能比编写一个返回 IEnumerable&lt;&gt; 强制半秒延迟的方法要好,因为您可能不需要等待整整半秒,具体取决于自上次 API 调用以来经过了多长时间。

    使用计时器,它将在您指定的时间间隔内调用您提供的委托,因此即使第一个任务没有完成,半秒后它也会在另一个线程上调用您的委托,所以有不会有任何额外的等待。

    从您的示例代码中,听起来您有一个任务列表,在这种情况下,我将使用System.Collections.Concurrent.ConcurrentQueue 来跟踪任务。一旦队列为空,关闭计时器。

    【讨论】:

      猜你喜欢
      • 2021-04-05
      • 2021-08-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-02-26
      • 1970-01-01
      • 2015-09-13
      相关资源
      最近更新 更多