【问题标题】:Dynamically processing a concurrent collection in parallel by group but serially within each group按组并行动态处理并发集合,但在每个组内串行处理
【发布时间】:2022-02-15 20:30:18
【问题描述】:

我遇到了一个我可以轻松定义的问题,但我一生似乎无法消化 MSDN 以获得最佳解决方案。自从我不得不真正考虑 UI 响应之外的并行处理已经有一段时间了。

也就是说,我有一个需要处理的并发任务集合。例如,它可能正在按类型(Consumer1,Consumer2,Consumer3...Consumer[N])向各种消费者加载数据,发送数据的基础任务对于每个任务都是相同的,但每个消费者一次只能接受一个源

基本上,我希望尽可能多地并行处理,但需要注意的是,我一次只能向每个消费者发送 1 个任务。因此,如果消费者的当前作业已经在进行中,那么我应该移至集合中的下一个项目,并将其留到该消费者正在进行的作业完成时。 Concurrent 集合也可以随时从外部添加,如果我们有新类型,我们需要额外的线程。

我想我的问题归结为如何从集合中自定义“Take”,以便我只使用一个属性来获取下一个任务,该属性指定它有一个没有正在进行的工作的消费者.

关于我在这里缺少什么或者我是否走在正确的道路上的任何想法?

例如,我们有一个中介队列,其中包含与银行交易相关的任务。

所以我们可能会添加到我们的中介队列(假设发送 SummaryData 和 Send TransactionData 使用相同的接口合约来发送数据)

  1. SendTransactionData -> Bank1
  2. SendTransactionData -> Bank2
  3. SendSummaryData -> 仲裁器
  4. SendTransactionData -> Bank1
  5. SendTransactionData -> Bank3
  6. SendTransactionData -> Bank1
  7. SendTransactionData -> Bank2

1,2,3,5 可以并行处理,但由于他们自己的系统,每个消费者一次只能接受一个输入,事务 4 必须等待事务 1 完成,事务 6 必须等待交易 4 进行处理。同样,事务 7 必须等待事务 2。

在任何初始过程完成之前,有人可能会添加另一个分组。

  1. SendSummaryData -> 仲裁器

  2. SendTransactionData -> Bank1

  3. SendTransactionData -> Bank4

如果有线程可用,可以立即提取 10 个,但 8 和 9 必须排在其他相关任务之后。

显然会有更好的方法来设计一个系统来实现这一点,但这些本质上是我希望满足的规范。

【问题讨论】:

  • 是否有可能在问题中包含如何在实践中使用该机制的实际场景?它打算执行什么样的处理?
  • 我肯定会添加这个。我不一定能准确地概述我的具体情况,但我会尝试对其进行概括。
  • 让我重新表述您的问题。您有一个需要处理的传入对象流,所有这些对象都可以无限并行处理。但是对象也有属性Bank,相同Bank的对象是不允许并行处理的,需要序列化。这个描述正确吗?
  • 基本上是的
  • 我实际上认为您在此处提到的实现可能正是我正在寻找的。我可能需要考虑信号量 slim 的数量,但我会做一些测试,至少在初始实现时尝试一下,看看会发生什么。

标签: c# .net parallel-processing task-parallel-library


【解决方案1】:

这是一种不基于 TPL 数据流库的方法。它基于Parallel.ForEachAsync API(从 .NET 6 开始提供)。下面的自定义ForEachExclusivePerKeyAsync 方法支持Parallel.ForEachAsync 重载的所有选项和功能,该重载具有IAsyncEnumerable<T> 作为source。发生错误或取消时的行为是相同的。唯一的区别是阻止了对具有相同键的元素的并发操作。每个元素的键是通过keySelector 函数获得的。对相同key的item进行序列化处理。

/// <summary>
/// Executes a for-each operation on an async-enumerable sequence in which
/// iterations may run concurrently, enforcing a non-concurrent execution policy
/// for elements having the same key.
/// </summary>
public static Task ForEachExclusivePerKeyAsync<TSource, TKey>(
    this IAsyncEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask> body,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = default)
{
    if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
    // The other arguments are validated by the Parallel.ForEachAsync itself.
    var perKey = new Dictionary<TKey, Queue<TSource>>(keyComparer);
    return Parallel.ForEachAsync(source, parallelOptions, async (item, ct) =>
    {
        var key = keySelector(item);
        lock (perKey)
        {
            if (!perKey.TryGetValue(key, out var queue))
            {
                // There is no other task in-flight with the same key.
                // Insert a null value as an indicator of activity.
                perKey.Add(key, null);
            }
            else
            {
                // A task with the same key is currently in-flight.
                // Enqueue this item and return.
                if (queue == null) perKey[key] = queue = new Queue<TSource>();
                queue.Enqueue(item); return;
            }
        }

        // Fire the task for this item, and for all other items with the
        // same key that might be queued while this task is in-flight.
        while (!ct.IsCancellationRequested)
        {
            await body(item, ct); // Continue on captured context
            lock (perKey)
            {
                var queue = perKey[key];
                if (queue == null || queue.Count == 0)
                {
                    perKey.Remove(key); break;
                }
                item = queue.Dequeue();
            }
        }
    });
}

使用示例。 Channel&lt;T&gt; 用作IAsyncEnumerable&lt;T&gt; 序列的source/controller

var channel = Channel.CreateUnbounded<Transaction>();
//...
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };
await ForEachExclusivePerKeyAsync(channel.Reader.ReadAllAsync(), options, async (x, _) =>
{
    await ProcessTransactionAsync(x);
}, keySelector: x => x.Bank);
//...
channel.Writer.TryWrite(new Transaction() { Bank = "Bank1" });
channel.Writer.TryWrite(new Transaction() { Bank = "Bank2" });

对于不依赖Parallel.ForEachAsync API 的相同方法的版本,因此它可以在早于 6 的 .NET 版本上运行,您可以查看此答案的3rd revision

【讨论】:

    【解决方案2】:

    如果您的问题是指定任务是否已被其他消费者提取,您需要对任务检索执行此检查。

    也就是说,您可能在某处有一些看起来像这样的代码:

    Task next = queue.GetNextTask();
    

    您需要更新您的队列以了解 2 件事:

    1. 目前正在进行哪些任务,以及,
    2. 是否有任何未进行的任务。

    如果队列可以(或可以)访问线程/进程池,您可以检查池中的#1。如果有的话,这是最好的选择。 #2 变得比典型的 FIFO 队列更复杂,从头部开始:检查该元素,如果可用则返回它,如果不可用则移动到下一个元素。

    如果没有,您需要用一个标志来装饰您的 Task 对象,以指示该任务是否正在进行中。像这样的标志(真的是信号量)很难管理且令人讨厌,但有时是唯一的选择。您必须确保正确锁定,并且任何尝试更新标志的人都尊重锁定。您还必须处理伤亡情况,例如如果线程或进程在处理任务时死亡会发生什么。

    无论哪种方式,您都可以将代码更新为:

    if (queue.HasAvailableTask()) {
      Task next = queue.GetNextAvailableTask();
      // Process task.
    } else {
      // No task to process, this thread should sleep or die.
    }
    

    同样,如果一个任务依赖于另一个任务的完成,队列的检查器(如HasAvailableTask())必须能够以某种方式确定一个任务是否可以立即处理,或者必须等到另一个任务完成。如果它必须等待,跳过它并继续检查队列中的下一个任务。

    编辑:检查相同类型的其他任务

    基本上,您必须封装一些代码(在正确的位置),任务运行者可以通过这些代码来决定队列中是否有任何可以立即运行的项目。

    这样做的一个好方法(还有其他方法)是用一些检查器来装饰进程/线程池。也就是说,在正在运行的任务中查看是否有任何(或有多少,如果需要)符合您的排除标准的任务。这些标准是什么,与此描述无关,它们只是您的业务逻辑。根据需要添加尽可能多的或尽可能少的。

    同样,队列应该有一个检查器来显示是否有任何(或有多少,如果需要)等待任务。

    当您检查是否有要运行的任务时,请匹配所有条件。如果队列中还有任何未排除的任务,则有工作要做,可以安排另一个任务。

    如果您的标准很复杂,您可以考虑分别从池和队列中返回某种轻量级任务描述符,其中只包含一个键(稍后查找任务)和足够的信息来做出所有决定你需要。考虑:

    public final class TaskDescriptor {
      public string Id; // Change the data type to match your task ID
      public string Type; // What type of task this is
      public string Source; // Either "runner" or "queue"
    }
    

    从跑步者和队列中填写一个列表,然后过滤。例如,您可以在Type 与特定类型匹配的位置查找Source = "runner",以确定该类型的另一个任务当前是否正在运行。

    【讨论】:

    • 比这稍微复杂一些。我不仅需要知道任务是否在进行中,还需要知道当前是否有任何相同类型的任务在进行中。
    猜你喜欢
    • 1970-01-01
    • 2017-01-27
    • 2014-10-08
    • 2018-06-06
    • 1970-01-01
    • 2020-12-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多