【问题标题】:Partition input and execute queries in parallel分区输入和并行执行查询
【发布时间】:2020-09-20 04:50:49
【问题描述】:

我有以下代码,我想在其中检索给定员工 ID 列表的员工信息。如果计数超过 100 万,我已经进行了验证以引发异常。在大多数情况下,请求将小于 200K,因此我将请求分成 4 个分区,每个分区包含相同数量的员工 ID。所有 4 个分区并行执行并在Task.WhenAll 之后连接在一起。有人可以给我一些进一步改进的提示吗?我查看了ParallelForEachAsyncParallel Foreach async in C#,但无法正常工作。下面提到的代码有效,但其硬编码为分成 4 个分区。如何使其与最大并行度设置为 50 的动态分区更加并行?如果输入是 100K ids,我想分成 10 个分区并并行执行所有 10 个。

public class Service
{
    private async Task<List<EmployeeEntity>> GetInfo(List<long> input)
    {
        var breakup = input.Split(4);

        var result1Task = GetResult(breakup.First().ToList());
        var result2Task = GetResult(breakup.Skip(1).Take(1).First().ToList());
        var result3Task = GetResult(breakup.Skip(2).Take(1).First().ToList());
        var result4Task = GetResult(breakup.Skip(3).Take(1).First().ToList());

        await Task.WhenAll(result1Task, result2Task, result3Task, result4Task);

        List<EmployeeEntity> result1 = await result1Task;
        List<EmployeeEntity> result2 = await result2Task;
        List<EmployeeEntity> result3 = await result3Task;
        List<EmployeeEntity> result4 = await result4Task;

        return result1.Union(result2.Union(result3.Union(result4))).ToList();
    }

    private async Task<List<EmployeeEntity>> GetResult(List<long> employees)
    {
        using var context = new MyAppDBContext();
        var EmployeeBand = await context.EmployeeBand.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
        var EmployeeClient = await context.EmployeeClient.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
        return await context.Employee.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
    }
}

public static class ExtensionMethods
{
    public static List<List<T>> Split<T>(this List<T> myList, int parts)
    {
        int i = 0;
        var splits = from item in myList
                     group item by i++ % parts into part
                     select part.ToList();
        return splits.ToList();
    }
}

public class EmployeeEntity
{
    public EmployeeEntity()
    {
        EmployeeBands = new HashSet<EmployeeBandEntity>();
        EmployeeClients = new HashSet<EmployeeClientEntity>();
    }

    public long EmployeeId { get; set; }
    public ICollection<EmployeeBandEntity> EmployeeBands { get; set; }
    public ICollection<EmployeeClientEntity> EmployeeClients { get; set; }
}

public class EmployeeBandEntity
{
    public long EmployeeBandId { get; set; }
    public long EmployeeId { get; set; }
    public EmployeeEntity EmployeeEntity { get; set; }
}

public class EmployeeClientEntity
{
    public long EmployeeClientId { get; set; }
    public long EmployeeId { get; set; }
    public EmployeeEntity EmployeeEntity { get; set; }
}

public partial class MyAppDBContext : DbContext
{
    public virtual DbSet<EmployeeEntity> Employee { get; set; }
    public virtual DbSet<EmployeeBandEntity> EmployeeBand { get; set; }
    public virtual DbSet<EmployeeClientEntity> EmployeeClient { get; set; }
}
     

【问题讨论】:

  • Tolist 是问题所在。您将整个数据集具体化只是为了创建任务。 GetResult的输入和输出应该是IQueryable&lt;T&gt;
  • 通常,db 是瓶颈。在您的情况下,绝对可能是这种情况。首先,您需要找出 GetResult 的最佳位置。我所说的甜蜜点的意思是,为什么只在 4 次分裂结束?此外,为什么不根据 GetResult 的最佳性能动态拆分。
  • 如何改进?你需要做并行吗?为什么数据库不为你/汽车做这件事?

标签: c# async-await entity-framework-core parallel.foreach


【解决方案1】:

我相信您可以对 GetResult 非常有创意并以更好的方式重写,以便查询类似于 where id greater than (and/or less than) 而不是 ids in (... list)。假设您的 GetResult 已经以最佳方式实现,您只需要一种方法来实现最大并行执行,这就是我的解决方案。

private async Task<List<EmployeeEntity>> GetInfo2(List<long> input)
{
    if (input == null)
    {
        return null;
    }

    if (input.Count == 0)
    {
        return new List<EmployeeEntity>();
    }

    var taskList = new List<Task<List<EmployeeEntity>>>();

    foreach (var batch in input.Batch(100))
    {
        taskList.Add(GetResult(batch.ToList()));
    }

    var result = (await Task.WhenAll(taskList)).SelectMany(a => a);

    return result.ToList();
}

这需要以下批量扩展方法。

public static class Extensions
    {
        public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
        {
            T[] bucket = null;
            var count = 0;

            foreach (var item in source)
            {
                if (bucket == null)
                    bucket = new T[size];

                bucket[count++] = item;

                if (count != size)
                    continue;

                yield return bucket.Select(x => x);

                bucket = null;
                count = 0;
            }

            if (bucket != null && count > 0)
                yield return bucket.Take(count);
        }
    }

您可以找到批量大小的最佳点。我已将其硬编码为 100,但您可以根据输入列表的大小或您可能拥有的任何其他逻辑推导出它。

【讨论】:

  • 感谢您的意见。我将研究微调 GetResults 方法。您的解决方案效果很好,我接受它作为正确答案!
  • foreach 循环将启动许多任务。我想控制最大并行度,如何在您的解决方案中实现?
  • @sravpk 您需要使用具有所需最大并行度的 semaphore slim。 SemaphoreSlim await one 将进入您的 GetResult 方法。所以除非释放其他锁,否则不会输入GerResult
【解决方案2】:

这是管理任务的替代解决方案,而无需编写我为之前的答案(在 cmets 中)建议的信号量。我遇到了这个 .net 开箱即用的解决方案,它使用来自 System.Threading.Tasks.Dataflow 命名空间的 ActionBlock。这需要在不使用信号量 slim 等的情况下将并发性保持在所需的最大程度。

Batch 扩展名仍然与我之前的回答相同。但是对于第一部分,您将使用ActionBlock,如下所示(而不是Task.WhenAll)。

        private async Task<List<EmployeeEntity>> GetInfo2(List<long> input)
    {
        if (input == null)
        {
            return null;
        }

        if (input.Count == 0)
        {
            return new List<EmployeeEntity>();
        }

        var employeeEntities = new BlockingCollection<EmployeeEntity>();

        var actionBlock = new ActionBlock<List<long>>(async (employeeIds) =>
        {
            var employees = await GetResult(employeeIds);
            employees.ForEach(e => employeeEntities.Add(e));
        }, new ExecutionDataflowBlockOptions
        {
            //config this to whatever works best in your situation
            MaxDegreeOfParallelism = 20
        });

        foreach (var batch in input.Batch(100))
        {
            await actionBlock.SendAsync(batch.ToList());
        }

        actionBlock.Complete();
        await actionBlock.Completion;

        return employeeEntities.ToList();
    }

【讨论】:

    猜你喜欢
    • 2019-04-04
    • 2022-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-04
    • 1970-01-01
    相关资源
    最近更新 更多