【问题标题】:Azure Table Storage ExecuteBatchAsync not running AsyncAzure 表存储 ExecuteBatchAsync 未运行异步
【发布时间】:2018-06-21 14:12:42
【问题描述】:

美好的一天,

我在使用 Azure 表存储的项目时遇到了一些问题,特别是我在让 ExecuteBatchAsync 实际上是异步时遇到了问题!我查看了许多不同的文章、示例和帮助文档,并尝试了许多不同的场景,但没有任何运气。基本上发生的情况是,每次我调用 _table.ExecuteBatchAsync(batchOperation) 时,代码都会同步运行而不是异步运行。

    public async Task<bool> Save(List<FoundOn> foundOns)
    {

        try
        {

            var batches = new List<Task<IList<TableResult>>>();

            List<List<FoundOn>> groups = foundOns.GroupBy(x => x.SourceURL.ToLower()).Select(s => s.ToList()).ToList();

            foreach (List<FoundOn> sourceRecords in groups)
            {

                var rowId = 0;

                List<PageLinkTableEntity> records = new PageLinkMapper().MapTo(sourceRecords);

                for (var i = 0; i < records.Count; i += TableConstants.TableServiceBatchMaximumOperations)
                {
                    var batchOperation = new TableBatchOperation();

                    List<PageLinkTableEntity> batchItems = records.Skip(i).Take(TableConstants.TableServiceBatchMaximumOperations).ToList();

                    foreach (PageLinkTableEntity item in batchItems)
                    {

                        item.RowKey = $"{item.RowKey}|{rowId}";
                        batchOperation.InsertOrReplace(item);
                        rowId++;

                    }

                    if (batchOperation.Count > 0)
                    {
                        //This line of code takes around 1 second to execute
                        batches.Add(_table.ExecuteBatchAsync(batchOperation));
                    }

                }

                if (batches.Count > 20)
                {
                    //This line of code takes 0ms to execute, as there are never any uncompleted tasks
                    await Task.WhenAll(batches);
                    batches.Clear();
                }

            }

            //This line of code takes 0ms to execute, as there are never any uncompleted tasks
            await Task.WhenAll(batches);

            return true;

        }
        catch (Exception ex)
        {
            _log.Error($"PageLinkRepository: Unable to save found on.  Error: { ex.Message }");
            throw;
        }

    }

正如您在此代码块中看到的,我的方法是异步的,我正在使用 await Task.WhenAll() 来等待列表中的任务。问题是,当我执行 batches.Add(_table.ExecuteBatchAsync(batchOperation)) 时,代码总是同步完成,导致 1 行执行时间超过 1 秒。

我按照推荐使用 STATIC 单例客户端。虽然我也尝试过专门为此方法创建的连接。

public static class AzureClient
{

    public static CloudStorageAccount Account { get; set; }
    public static CloudTableClient Client { get; set; }
    static AzureClient()
    {

        Account = CloudStorageAccount.Parse(Environment.GetEnvironmentVariable("AzureWebJobsStorage"));

        ServicePoint tableServicePoint = ServicePointManager.FindServicePoint(Account.TableEndpoint);
        tableServicePoint.UseNagleAlgorithm = false;
        tableServicePoint.Expect100Continue = false;
        tableServicePoint.ConnectionLimit = 100;

        Client = Account.CreateCloudTableClient();

    }

}

我的数据量相对较小,批处理中的每条记录,作为一个 JSON 对象只有大约 135 个字节,我尝试了从我的工作站到使用 300Mbps 光纤连接的 Azure 以及从 Azure 上的 VM 到Azure 存储。所以我不认为这是带宽/网络问题。

我也尝试过使用:

batches.Add(Task.Run(() => _table.ExecuteBatchAsync(batchOperation)));

我在异步中运行的任何其他代码都没有问题。我在 CPU 或带宽绑定方面没有任何问题,因为我在本地使用 12 核/24 线程双 XEON,并且我尝试在 8 核 Azure VM 上运行。

我还尝试使用以下内容装饰我的应用:

        ServicePointManager.DefaultConnectionLimit = 1000;
        ThreadPool.SetMinThreads(128, 128);

努力确保我没有遇到连接或线程限制。

通常我会插入 4-5 批,这需要 5000-9000 毫秒,通常每批需要 1000-1500 毫秒。根据我读过的所有内容,我应该每秒接近 2000-20000 条记录。通常这些会进入 1 个分区,但我尝试让每个批次都成为单独的分区,但没有任何改进。

我试过从以下位置运行它:

控制台应用 天蓝色函数 网络 API MVC 控制器

所有结果都相同。我猜我一定是做错了什么,尽管我一生都无法弄清楚它可能是什么。

非常感谢任何想法/建议。

提前谢谢你!

【问题讨论】:

    标签: c# azure asynchronous nosql


    【解决方案1】:

    根据文档,它将启动异步操作来执行一批操作。

    ExecuteBatchAsync(TableBatchOperation)
    启动异步操作以对表执行一批操作。

    当您将它添加到您的批处理操作时,您正在启动它。您可能需要考虑只存储一批批次,然后在您拥有 20 个批次后对每个批次调用 ExecuteBatchAsync。

    类似这样的东西(检查语法,我只是在记事本中输入的;))

    public async Task<bool> Save(List<FoundOn> foundOns)
    {
    
        try
        {
    
            var batches = new List<TableBatchOperation>();
    
            List<List<FoundOn>> groups = foundOns.GroupBy(x => x.SourceURL.ToLower()).Select(s => s.ToList()).ToList();
    
            foreach (List<FoundOn> sourceRecords in groups)
            {
    
                var rowId = 0;
    
                List<PageLinkTableEntity> records = new PageLinkMapper().MapTo(sourceRecords);
    
                for (var i = 0; i < records.Count; i += TableConstants.TableServiceBatchMaximumOperations)
                {
                    var batchOperation = new TableBatchOperation();
    
                    List<PageLinkTableEntity> batchItems = records.Skip(i).Take(TableConstants.TableServiceBatchMaximumOperations).ToList();
    
                    foreach (PageLinkTableEntity item in batchItems)
                    {
    
                        item.RowKey = $"{item.RowKey}|{rowId}";
                        batchOperation.InsertOrReplace(item);
                        rowId++;
    
                    }
    
                    if (batchOperation.Count > 0)
                    {
                        //This line of code takes around 1 second to execute
                        batches.Add(batchOperation);
                    }
    
                }
    
                if (batches.Count > 20)
                {
                    //This line of code takes 0ms to execute, as there are never any uncompleted tasks
                    await Task.WhenAll(batches.Select(c => _table.ExecuteBatchAsync(c)));
                    batches.Clear();
                }
    
            }
    
            //This line of code takes 0ms to execute, as there are never any uncompleted tasks
            await Task.WhenAll(batches.Select(c => _table.ExecuteBatchAsync(c)));
    
            return true;
    
        }
        catch (Exception ex)
        {
            _log.Error($"PageLinkRepository: Unable to save found on.  Error: { ex.Message }");
            throw;
        }
    
    }
    

    【讨论】:

    • 谢谢我也试过了,结果是一样的,它们一个接一个地执行,而不是并行执行。我也尝试过使用具有相同结果的 Parallel.ForEach。似乎连接上的某些东西导致它们一个接一个地执行而不是并行执行。
    • 真正奇怪的是:Task.Run(() => _table.ExecuteBatchAsync(batchOperation));应该将任务扔到一个新线程中,并在我的其他完全可用的核心之一上运行它,同时我的代码继续创建下一批。但似乎连接将其限制为一次一个。
    猜你喜欢
    • 1970-01-01
    • 2014-04-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多