【问题标题】:How to do async 'paged' processing of EF select result如何对 EF 选择结果进行异步“分页”处理
【发布时间】:2014-04-11 09:10:10
【问题描述】:

我正在编写将 SQL 服务器中的记录加载到天蓝色队列的内容。问题是,选择结果中的项目数可能非常大,所以我想在仍在检索数据的同时开始排队。

我正在尝试利用 EF6(异步)、所有异步方法和 TPL 进行并行排队。所以我有:

        // This defines queue that Generator will publsh to and 
        // QueueManager wil read from. More info:
        // http://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx
        var queue = new BufferBlock<ProcessQueueItem>();

        // Configure queue listener first
        var result = this.ReceiveAndEnqueue(queue);

        // Start generation process
        var tasks = generator.Generate(batchId);

ReceiveAndEnqueue 很简单:

    private async Task ReceiveAndEnqueue(ISourceBlock<ProcessQueueItem> queue)
    {
        while (await queue.OutputAvailableAsync())
        {
            var processQueueItem = await queue.ReceiveAsync();
            await this.queueManager.Enqueue(processQueueItem);
            this.tasksEnqueued++;
        }
    }

生成器 generate() 签名如下:

public void Generate(Guid someId, ITargetBlock<ProcessQueueItem> target)

它调用目标上的 SendAsync() 方法来放置新项目。我现在正在做的是将结果总数分成“批次”,加载它们,然后异步发送,直到全部完成:

    public void Generate(Guid batchId, ITargetBlock<ProcessQueueItem> target)
    {
        var accountPromise = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());
        accountPromise.Wait();
        var accounts = accountPromise.Result;

        // Batch configuration
        var itemCount = accounts.Count();
        var numBatches = (int)Math.Ceiling((double)itemCount / this.batchSize);
        Debug.WriteLine("Found {0} items what will be put in {1} batches of {2}", itemCount, numBatches, this.batchSize); 


        for (int i = 0; i < numBatches; i++)
        {
            var itemsToTake = Math.Min(this.batchSize, itemCount - currentIndex);
            Debug.WriteLine("Running batch - skip {0} and take {1}", currentIndex, itemsToTake);

            // Take a subset of the items and place them onto the queue
            var batch = accounts.Skip(currentIndex).Take(itemsToTake);

            // Generate a list of tasks to enqueue the items
            var taskList = new List<Task>(itemsToTake);
            taskList.AddRange(batch.Select(account => target.SendAsync(account.AsProcessQueueItem(batchId))));

            // Return the control when all tasks have been enqueued
            Task.WaitAll(taskList.ToArray());

            currentIndex = currentIndex + this.batchSize;
        } 

但是,我的同事说,这行得通 - '我们不能让界面更简单,让 Generate() 使界面像这样:

public Task<IEnumerable<ProcessQueueItem> Generate(Guid someId)

更简洁,并且 Generate 方法不依赖于 TPL 库。我完全同意,我只是害怕如果我这样做,我将不得不打电话给

var result = Generate().Wait().Result;

在某个时刻,在所有项目排队之前。这会让我等到所有的东西都被加载并在内存中。

所以我的问题是:如何在 EF 查询结果从选择中“滴入”后立即开始使用它们?就好像如果你发现我的想法,EF 会对结果进行“让步”。

编辑 我想我犯了一个思维错误。 EF 默认会延迟加载项目。所以我可以将所有结果作为 IQueryable 返回,但这并不意味着它们实际上是从数据库中加载的。然后我将遍历它们并将它们排入队列。

编辑 2 不,这不起作用,因为我需要在 Generate() 方法中从数据库中转换对象...

【问题讨论】:

  • 结果分页应该使用正确的 T-SQL 语句来完成。异步与语句的执行方式有关,与分页或结果的形状无关。一个普通的同步 SqlReader 对于逐行消费(和排队)结果要好得多。 ORM(不仅仅是 EF)对于像您描述的那样的批处理操作来说是一个糟糕的选择

标签: c# .net entity-framework asynchronous tpl-dataflow


【解决方案1】:

好的,这就是我最终的结果:

    public IEnumerable<ProcessQueueItem> Generate(Guid batchId)
    {
        var accounts = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());

        foreach (var accountStatuse in accounts)
        {
            yield return accountStatuse.AsProcessQueueItem(batchId);
        }
    }

存储库仅返回一些 DataContext.Stuff.Where(...) 的 IEnumerable。生成器使用扩展方法将实体转化为领域模型(ProcessQueueItem),通过yield的方式立即发送给方法的调用者,调用者开始调用QueueManager开始排队。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-02-21
    • 2019-06-24
    • 1970-01-01
    • 2020-05-29
    • 2013-03-02
    • 2021-05-22
    • 2015-02-16
    相关资源
    最近更新 更多