【问题标题】:Parallel execution of tasks in groups分组并行执行任务
【发布时间】:2018-03-05 02:22:56
【问题描述】:

我用一个简单的例子来描述我的问题,然后描述一个更接近的问题。

想象我们在 box1 中有 n 个项目 [i1,i2,i3,i4,...,in] 并且我们有一个 box2 可以处理 m 个项目来执行它们(m 通常远小于 n)。每个项目所需的时间是不同的。我想一直做 m 个工作项目,直到所有项目都进行完。

一个更接近的问题是,例如,您有一个包含 n 个文件字符串(URL 地址)的 list1,我们希望有一个系统可以同时下载 m 个文件(例如通过 httpclient.getAsync() 方法)。每当 m 项中的一项下载完成时,必须尽快替换 list1 中的另一个剩余项,并且必须计算到所有 List1 项都继续进行。 (n 和 m 的数量由用户在运行时输入)

如何做到这一点?

【问题讨论】:

    标签: c# parallel-processing async-await task-parallel-library


    【解决方案1】:

    这是您可以使用的通用方法。

    当您调用此 TIn 时,它将是字符串(URL 地址),并且 asyncProcessor 将是您的异步​​方法,它将 URL 地址作为输入并返回一个任务。

    此方法使用的 SlimSemaphore 将只允许实时并发异步 I/O 请求的数量,一旦一个完成,另一个请求就会执行。类似于滑动窗口模式。

    public static Task ForEachAsync<TIn>(
                IEnumerable<TIn> inputEnumerable,
                Func<TIn, Task> asyncProcessor,
                int? maxDegreeOfParallelism = null)
            {
                int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
                SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);
    
                IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
                {
                    await throttler.WaitAsync().ConfigureAwait(false);
                    try
                    {
                        await asyncProcessor(input).ConfigureAwait(false);
                    }
                    finally
                    {
                        throttler.Release();
                    }
                });
    
                return Task.WhenAll(tasks);
            }
    

    【讨论】:

    • 谢谢。看起来很可爱。我必须对其进行测试并报告其工作情况。
    • 抱歉,我有一个问题。它是立即创建所有任务并等待每个任务的顺序变为时间线,还是在他的时间和必要时创建任务?
    • Task.WhenAll 在内部为所有任务创建一个列表,所以我认为它会立即创建所有任务
    • 如果 mylist 包含数千或数百万个 url 怎么办? :(
    • 它会立即创建任务,并在所有创建的任务完成后返回一个完成的任务。无论列表中有数百万或数百个元素,方法的工作方式都是相同的。如果您指定 maxDegreeOfParallism 参数,那么任何时候都只会有那么多并发异步请求,否则该方法会回退到最大并行度的默认值。只要您提供的异步处理器是真正异步的,此方法就不需要线程池线程来执行其异步操作。您可能想尝试找到正确的 maxDegreeOfParallism 值。
    【解决方案2】:

    您应该查看TPL Dataflow,将System.Threading.Tasks.Dataflow NuGet 包添加到您的项目中,然后您想要的就这么简单

    private static HttpClient _client = new HttpClient();
    public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris, 
                                                      int concurrentDownloads)
    {
        var result = new List<MyClass>();
    
        var downloadData = new TransformBlock<string, string>(async uri =>
        {
            return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method.
        }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});
    
        var processData = new TransformBlock<string, MyClass>(
              json => JsonConvert.DeserializeObject<MyClass>(json), 
              new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded});
    
        var collectData = new ActionBlock<MyClass>(
              data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time.
    
        //Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item.
        downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true});
        processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true});
    
        //Load the data in to the first transform block to start off the process.
        foreach (var uri in uris)
        {
            await downloadData.SendAsync(uri).ConfigureAwait(false);
        }
        downloadData.Complete(); //Signal you are done adding data.
    
        //Wait for the last object to be added to the list.
        await collectData.Completion.ConfigureAwait(false);
    
        return result;
    }
    

    在上面的代码中,只有concurrentDownloads 数量的 HttpClient 在任何给定时间将处于活动状态,无限线程将处理接收到的字符串并将它们转换为对象,单个线程将获取这些对象并将它们添加到一个列表。

    更新:这是一个简化的示例,仅执行您在问题中要求的内容

    private static HttpClient _client = new HttpClient();
    public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads)
    {
        var downloadData = new ActionBlock<string>(async uri =>
        {
            var response = await _client.GetAsync(uri); //GetAsync is a thread safe method.
            //do something with response here.
        }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});
    
    
        foreach (var uri in uris)
        {
           downloadData.Post(uri);
        }
        downloadData.Complete();
    
        downloadData.Completion.Wait();
    }
    

    【讨论】:

    • 谢谢兄弟。在为我的问题寻找答案的过程中,我听到并看到了很多关于 TPL 或 Reactive Extension 的信息,但这对我来说有些复杂,并且不知道如何使用它们。真的没有更简单的解决方案吗? :)
    • 数据流一旦你意识到你只是在管道中设置步骤,它就很容易做到。我的示例过于复杂,因此我可以向您展示 TPL DataFlow 的所有功能,我已经更新了一个仅满足您需求的示例。
    • HttpClient 旨在为多个请求重用,甚至是并发的。创建一个实例并使用它,不要每次都创建一个新实例
    • 我不记得是HttpClient还是WebClient那样,答案是固定的。
    • 谢谢。看来我应该让自己熟悉 TPL 数据流,不要逃避他 :)
    【解决方案3】:

    一个简单的节流解决方案是SemaphoreSlim
    编辑
    稍作改动后,代码现在会在需要时创建任务

    var client = new HttpClient();
    SemaphoreSlim semaphore = new SemaphoreSlim(m, m); //set the max here
    var tasks = new List<Task>();
    
    foreach(var url in urls)
    {
        // moving the wait here throttles the foreach loop
        await semaphore.WaitAsync();
        tasks.Add(((Func<Task>)(async () =>
        {
            //await semaphore.WaitAsync();
            var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here
            // do something with response
            semaphore.Release();
        }))());
    }
    
    await Task.WhenAll(tasks);
    

    这是另一种方法

    var client = new HttpClient();
    var tasks = new HashSet<Task>();
    
    foreach(var url in urls)
    {
        if(tasks.Count == m)
        {
            tasks.Remove(await Task.WhenAny(tasks));            
        }
    
        tasks.Add(((Func<Task>)(async () =>
        {
            var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here
            // do something with response            
        }))());
    }
    
    await Task.WhenAll(tasks);
    

    【讨论】:

    • 它似乎完成了这项工作,它在 mot m urls simulantously 下载,但有一个问题。例如,如果您有一个包含一百万个 url 的列表,它会在短时间内创建一百万个任务,然后等待每个任务的顺序变为。我错了吗?
    • 你说得对,它会在短时间内创建所有任务。它还在同一个线程上执行所有操作,但您可以使用 ConfigureAwait(false) 更改它或在线程池上运行它们。我会用更多信息更新答案
    • 我的 url 列表可能很长,可能有数百万个,如果创建了所有这些数百万个任务,可能会导致内存不足、其他异常或错误:) 我正在寻找每个问题的解决方案需要时分,内存占用少
    • 您可以在线程池上运行代码还是希望它们在同一个线程上运行?
    • 我不喜欢使用 Task.Run()。我喜欢尽可能使用同一个线程
    【解决方案4】:

    并行处理项目,限制同时作业的数量:

    string[] strings = GetStrings();  // Items to process.
    const int m = 2;  // Max simultaneous jobs.
    
    Parallel.ForEach(strings, new ParallelOptions {MaxDegreeOfParallelism = m}, s =>
    {
        DoWork(s);
    });
    

    【讨论】:

    • 他的DoWork是异步的,Parallel.ForEach不支持异步。
    • 此方法不适用于我的问题。因为您不能将 Parallel.ForEach 与异步方法一起使用。在将 Parallel.ForEach 与异步方法一起使用的情况下,所有任务都会立即触发(它不会等待异步任务完成)。我正在使用 HttpClient.getAsync 这是一种异步方法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多