【问题标题】:Multiple Long Running Tasks in Parallel并行执行多个长时间运行的任务
【发布时间】:2021-10-15 22:44:38
【问题描述】:

尝试创建一个服务来使用来自 Kafka 的许多主题。我想为每个消费者创建一个任务,这意味着这些是长时间运行的任务。

在每个任务中,如果我这样做了;

while(!token.IsCancellationRequested)
{
 var result = await consumer.Consume(token);
}

然后,如果我尝试启动多个任务,第一个会启动,但其他任务不会继续。

我以为我正在寻找有关如何正确管理任务的最佳实践(这不是我经常做的事情)并发布在软件工程上,但有人告诉我有一个错误,我应该在这里发布。

看这个作品;

async Task Main()
{
    var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
    
    await Task.WhenAll(Consumer1(cancellationTokenSource.Token), Consumer2(cancellationTokenSource.Token));
    
    Console.WriteLine("Complete");
}

private async Task Consumer1(CancellationToken token)
{
    Console.WriteLine("Working on consumer 1");
    
    while (!token.IsCancellationRequested)
    {
        // Do a thing
        // await consumer.Consume() 
    }
    
    Console.WriteLine("Completed Consumer 1");

    await Task.CompletedTask;
}

private async Task Consumer2(CancellationToken token)
{
    Console.WriteLine("Completed consumer 2");
    
    await Task.CompletedTask;
}

结果将是;

00:00:00: Working on consumer 1 
00:00:05: Completed Consumer 1 
00:00:05: Completed consumer 2
00:00:05: Complete

如果你在 Consumer1 中放置一个 await Task.Delay(1),在 while 之上,你会得到;

00:00:00: Working on consumer 1
00:00:00: Completed consumer 2
00:00:05: Completed Consumer 1
00:00:05: Complete

实际代码确实包含await consumer.Consume(token),但这不足以并行运行任务,并且行为告诉我我在某处缺少正确的用法。所以,我的第一个问题是,我在这里是否完全错误地使用了任务,即是否有最佳实践来实现我想要做的事情。其次,我在代码中缺少什么错误?

供@Jakoss 评论参考,这将按预期运行任务;

var tasks = new List<Task>();

tasks.Add(Task.Run(async () => await Consumer1(cancellationTokenSource.Token)));
tasks.Add(Task.Run(async () => await Consumer2(cancellationTokenSource.Token)));

await Task.WhenAll(tasks.ToArray());

这是正确的做法吗?

【问题讨论】:

  • 从您的示例输出看来,该示例正在按您的预期工作?就Task 的使用而言,至少在我看来一切都很好。
  • 只有当我把 Task.Delay(1) 放进去时,它才会开始第二个任务。我希望两个消费者都在运行,而获得这种行为的唯一方法是使用 Task.Delay,这让我觉得我做的很糟糕或误解了这一切是如何运作的。
  • @tbddeveloper 我想知道它与 SynchronizationContext 有关。您是否尝试将您的消费者作为显式任务运行?喜欢Task.Run(async () =&gt; await Consumer2(token));
  • 但第一个输出显示消费者 2 已运行。你的意思是它开始得太晚了吗?它是否仅在第一个任务被取消后才开始?你的描述对我来说不是很清楚。也许您可以在输出中添加时间戳或其他内容?
  • @GoodNightNerdPride 是的,在第一个代码中,Consumer2 在 1 完成之前不会运行。

标签: c# task-parallel-library


【解决方案1】:

要使Consumer2Consumer1 完成之前运行,Consumer1 必须在某个时间点放弃当前线程(并为该方法的其余部分安排一个继续)。

这通常发生在 I/O 或其他一些事件驱动的过程中。也许当您拨打await consumer.Consume() 时会发生这种情况?当您await Task.Delay(1) 时它肯定会这样做,这就是为什么您的第二个示例会产生问题中所述的输出。

替代方法是将Consumer1 的执行显式推送到它自己的线程上,这可以使用Task.Run 来完成:

await Task.WhenAll(
    Task.Run(() => Consumer1(cancellationTokenSource.Token)), 
    Consumer2(cancellationTokenSource.Token));

但是,现在这将使用额外的线程有效地“伪造”async 行为。

【讨论】:

    【解决方案2】:

    只有当我将 Task.Delay(1) 放入时,它才会启动第二个任务。我希望两个消费者都在运行,而获得这种行为的唯一方法是使用 Task.Delay,这让我觉得我做的很糟糕或误解了这一切是如何运作的。

    所有方法(包括async 方法)begin executing synchronously。此外,如果 await 用于已完成的可等待对象,它将继续同步执行。在示例代码中,由于await consumer.Consume()被注释掉了,所以整个方法是同步运行的。

    如果您确定consumer.Consume 会异步运行,那么您只需取消注释即可。但是,如果它可能以异步方式运行(例如,如果一条消息已经到达以供使用),那么您可能希望使用Task.Run 在后台线程上启动该方法。 p>

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-02-11
      • 1970-01-01
      • 1970-01-01
      • 2016-10-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多