【问题标题】:Recursive Asynchronous calls with TPL/ async await使用 TPL/异步等待的递归异步调用
【发布时间】:2015-08-27 15:00:40
【问题描述】:

我正在研究使用 C# 异步功能(TPL/async/await)以递归方式处理层次结构。这是我正在尝试做的事情的概述

我有一个要处理的作业集合,如下所示。每个 Job 都有事情要做,并且可以选择有一个或多个孩子也有事情要做。所有父子作业都调用同一个函数来做实际的“工作”,这个函数在“异步”中(代码如下)

/*
 *  Jobs Collection
 *  |
 *  |__ Job1
 *  |    |__ Job4
 *  |    |     |__ Job7
 *  |    |
 *  |    |__ Job5
 *  |
 *  |__ Job2
 *  |    |__ Job6
 *  |
 *  |__ Job3
 *  |
 */
  1. 层次结构中有 3 个级别。

  2. 我想开始并行处理第一级(Job1、Job2、Job3)。

  3. 一旦它们并行启动,每个单独的作业都将开始处理 本身,等待其处理完成(重要),然后将继续递归处理其子级,直到层次结构结束。孩子依赖于父母处理的数据,因此他们等待父母处理完成。

  4. 实际“作业”(由父级和子级调用)的处理是异步发生的,因为调用方法异步工作 - 因此不需要“新线程”(Task.StartNew())。

这是我用来演示场景的示例代码 -

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    // first level 
    Parallel.ForEach(jobs,
                new ParallelOptions { MaxDegreeOfParallelism = 2 }, // parallelism hardcoded for simplicity
                (job) => ExecuteJob(job));
}

private void ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    Task t = GetDataAsync(job);
    t.Wait(); // needed such that parent response is received before children start over (?).


    if (job.Children != null)
    {
        job.Children.ToList().ForEach((r) =>
        {
            r.ParentResponse = job.Response; // Children need parent's response
            ExecuteJob(r);
        });
    }
}

private async Task GetDataAsync(WebJob j)
{
    // This is just test code. Ideally it would be an external call to some "async" method
    await Task.Delay(1000);
    j.Response = string.Format("{0} complete", j.Name);
    Console.ForegroundColor = ConsoleColor.Cyan;
    Console.WriteLine("parentResp>> {0} :: {1} Job>> {2} :: {3} Thread>> {4}", j.ParentResponse, "\t", j.Name, "\t", Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine("--------------");
}

private WebJob[] CreateWebJobs()
{
    return new WebJob[] {
        new WebJob() { Id=1, Name = "Job1", ExecURL = "http://url1", 
            Children = new WebJob[] 
            {
                new WebJob() 
                { 
                    Id=2, Name = "Job2", ExecURL = "http://url2", 
                    Children = new WebJob[] 
                    {
                        new WebJob() { Id=4, Name = "Job4", ExecURL = "http://url4" }
                    }
                },
                new WebJob() 
                { 
                    Id=3, Name = "Job3", ExecURL = "http://url3" 
                }
            }
        },
        new WebJob() { Id=5, Name = "Job5", ExecURL = "http://url5"}                
    };
}
  • 流程方法首先启动所有“第一级”作业 平行线。
  • ExecuteJob 方法接管并递归地转到 孩子们完成所有处理

这行得通,但我不相信这种递归异步模式是否是一种有效的方法。我在想避免 t.Wait()。我在 t 上尝试过 ContinueWith,这在我的理解中似乎没有什么不同,我还阅读了 ForEachAsync 模式,想知道这是否合适。该解决方案最终将成为 ASP.NET Web API 服务。对这种递归异步模式有什么想法吗?

【问题讨论】:

  • 你有两次 Job5。这是否意味着它取决于 1 和 4 还是一个错字?
  • @mike - 这是一个错字。已更正。谢谢

标签: c# .net multithreading asynchronous recursion


【解决方案1】:

如果GetDataAsync 是您拥有的唯一阻塞操作,那么您可以在整个过程中使用异步编程,避免需要Parallel.ForEach 调用或阻塞Wait 调用。

public async Task Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    await Task.WhenAll(jobs.Select(ExecuteJob));
}

private async Task ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    await GetDataAsync(job);

    if (job.Children != null)
    {
        var childTasks = job.Children.Select(r =>
        {
            r.ParentResponse = job.Response;
            return ExecuteJob(r);
        });

        await Task.WhenAll(childTasks);
    }
}

编辑:如果顶级方法应该阻塞(而不是冒着让消费者一劳永逸的风险),请执行以下操作:

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    Task.WaitAll(jobs.Select(ExecuteJob));
}

【讨论】:

  • Process 不会是 async,因为您在其中使用了 await
  • @juharr:正确。最终有人需要await,所以我可以把它留给打电话给Process的人,或者用Wait屏蔽自己
  • @Douglas - 有道理,感谢您的想法。我想知道您是否曾经想使用“Parallel.ForEach”来开始至少第一组作业。假设我是否必须在 GetDataAsync 处理数据之前对其进行预处理。这可能会延迟处理,因为它将是连续的,直到预处理完成(假设它需要相当多的 cpu cyles)并进入“GetDataAsync”方法?
  • @Lalman:你提出了一个有效的场景。不幸的是,Parallel.ForEach 不能很好地与async 配合使用。最简单的解决方案是在自己的任务中执行预处理(在ExecuteJob 的开头)并等待其结果:await Task.Run(() => Preprocess(job))。但是,这不允许等效于 MaxDegreeOfParallelism(除非您定义自己的任务调度程序)。
【解决方案2】:

由于您的核心是异步的,因此您根本不应该使用并行或多线程。您想要的是 concurrency 没有 parallelism - 即异步并发,通常使用 Task.WhenAll 完成。

这是双重事实,因为您计划部署到 ASP.NET,而并行性会显着降低您的可伸缩性。

public async Task ProcessAsync()
{
  WebJob[] jobs = CreateWebJobs();

  await Task.WhenAll(jobs.Select(x => ExecuteJobAsync(x)));
}

private async Task ExecuteJobAsync(WebJob job, [CallerMemberName] string memberName = "")
{
  Console.ForegroundColor = ConsoleColor.DarkYellow;
  Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

  await GetDataAsync(job);
  if (job.Children != null)
  {
    var childTasks = job.Children.Select(async x =>
    {
      x.ParentResponse = job.Response; // Children need parent's response
      await ExecuteJobAsync(x);
    });
    await Task.WhenAll(childTasks);
  }
}

【讨论】:

  • 确实...只是看了一眼方法签名,而不是仔细查看实际代码。删除 cmets .10...9...
猜你喜欢
  • 2021-05-12
  • 2021-08-28
  • 2016-02-21
  • 1970-01-01
  • 1970-01-01
  • 2020-09-08
  • 1970-01-01
  • 2022-01-27
  • 1970-01-01
相关资源
最近更新 更多