【发布时间】:2018-06-21 14:12:42
【问题描述】:
美好的一天,
我在使用 Azure 表存储的项目时遇到了一些问题,特别是我在让 ExecuteBatchAsync 实际上是异步时遇到了问题!我查看了许多不同的文章、示例和帮助文档,并尝试了许多不同的场景,但没有任何运气。基本上发生的情况是,每次我调用 _table.ExecuteBatchAsync(batchOperation) 时,代码都会同步运行而不是异步运行。
public async Task<bool> Save(List<FoundOn> foundOns)
{
try
{
var batches = new List<Task<IList<TableResult>>>();
List<List<FoundOn>> groups = foundOns.GroupBy(x => x.SourceURL.ToLower()).Select(s => s.ToList()).ToList();
foreach (List<FoundOn> sourceRecords in groups)
{
var rowId = 0;
List<PageLinkTableEntity> records = new PageLinkMapper().MapTo(sourceRecords);
for (var i = 0; i < records.Count; i += TableConstants.TableServiceBatchMaximumOperations)
{
var batchOperation = new TableBatchOperation();
List<PageLinkTableEntity> batchItems = records.Skip(i).Take(TableConstants.TableServiceBatchMaximumOperations).ToList();
foreach (PageLinkTableEntity item in batchItems)
{
item.RowKey = $"{item.RowKey}|{rowId}";
batchOperation.InsertOrReplace(item);
rowId++;
}
if (batchOperation.Count > 0)
{
//This line of code takes around 1 second to execute
batches.Add(_table.ExecuteBatchAsync(batchOperation));
}
}
if (batches.Count > 20)
{
//This line of code takes 0ms to execute, as there are never any uncompleted tasks
await Task.WhenAll(batches);
batches.Clear();
}
}
//This line of code takes 0ms to execute, as there are never any uncompleted tasks
await Task.WhenAll(batches);
return true;
}
catch (Exception ex)
{
_log.Error($"PageLinkRepository: Unable to save found on. Error: { ex.Message }");
throw;
}
}
正如您在此代码块中看到的,我的方法是异步的,我正在使用 await Task.WhenAll() 来等待列表中的任务。问题是,当我执行 batches.Add(_table.ExecuteBatchAsync(batchOperation)) 时,代码总是同步完成,导致 1 行执行时间超过 1 秒。
我按照推荐使用 STATIC 单例客户端。虽然我也尝试过专门为此方法创建的连接。
public static class AzureClient
{
public static CloudStorageAccount Account { get; set; }
public static CloudTableClient Client { get; set; }
static AzureClient()
{
Account = CloudStorageAccount.Parse(Environment.GetEnvironmentVariable("AzureWebJobsStorage"));
ServicePoint tableServicePoint = ServicePointManager.FindServicePoint(Account.TableEndpoint);
tableServicePoint.UseNagleAlgorithm = false;
tableServicePoint.Expect100Continue = false;
tableServicePoint.ConnectionLimit = 100;
Client = Account.CreateCloudTableClient();
}
}
我的数据量相对较小,批处理中的每条记录,作为一个 JSON 对象只有大约 135 个字节,我尝试了从我的工作站到使用 300Mbps 光纤连接的 Azure 以及从 Azure 上的 VM 到Azure 存储。所以我不认为这是带宽/网络问题。
我也尝试过使用:
batches.Add(Task.Run(() => _table.ExecuteBatchAsync(batchOperation)));
我在异步中运行的任何其他代码都没有问题。我在 CPU 或带宽绑定方面没有任何问题,因为我在本地使用 12 核/24 线程双 XEON,并且我尝试在 8 核 Azure VM 上运行。
我还尝试使用以下内容装饰我的应用:
ServicePointManager.DefaultConnectionLimit = 1000;
ThreadPool.SetMinThreads(128, 128);
努力确保我没有遇到连接或线程限制。
通常我会插入 4-5 批,这需要 5000-9000 毫秒,通常每批需要 1000-1500 毫秒。根据我读过的所有内容,我应该每秒接近 2000-20000 条记录。通常这些会进入 1 个分区,但我尝试让每个批次都成为单独的分区,但没有任何改进。
我试过从以下位置运行它:
控制台应用 天蓝色函数 网络 API MVC 控制器
所有结果都相同。我猜我一定是做错了什么,尽管我一生都无法弄清楚它可能是什么。
非常感谢任何想法/建议。
提前谢谢你!
【问题讨论】:
标签: c# azure asynchronous nosql