【问题标题】:When running Azure Data Factory Activities in Azure Batch, how should asynchronicity be handled在 Azure Batch 中运行 Azure 数据工厂活动时,应如何处理异步性
【发布时间】:2017-06-22 14:26:41
【问题描述】:

背景

我已经稍微简化了这个场景,但这是普遍的问题。

我正在使用 Azure 数据工厂将自定义 API 中的数据提取到 Azure 数据仓库中的表中。我正在使用 IDotNetActivity 运行调用 API 并将数据加载到数据仓库的 C# 代码。该活动在 Azure Batch 中运行。

在活动本身中,在调用自定义 API 之前,我从 Azure Blob 存储中的文件加载人员列表。 然后,我为文件中的每个人调用自定义 API。 这些调用一个接一个地依次进行。 问题是这种方法耗时太长。 文件大小可能会增长,因此所花费的时间只会变得更糟。

我为提高性能所做的尝试

  • 使 API 调用异步并以 3 个为一组调用它们。奇怪的是,这运行得更慢。看起来批处理不能很好地处理异步/等待。
  • 我们看到的其他奇怪之处是MoreLinq 的批处理命令根本不起作用。我已经检查了源代码: https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs 。这使用了 yield return 但我不知道为什么这不起作用,或者即使它与 async / await 问题有关。

主要问题

Azure Batch 是否支持异步/等待?

其他问题

  • 如果 Azure 不支持异步/等待,那么解决此问题的更好方法是什么?即使用作业管理器并启动更多节点。
  • 谁能解释一下为什么 MoreLinq 的 Batch 在 Azure Batch 中不起作用? 这是受影响代码的 sn-p:

    List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
    var customResults = new List<CustomApiResult>();
    foreach (var personIdsBatch in personIds.Batch(100))
    {
        customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
    }
    

【问题讨论】:

    标签: c# azure asynchronous azure-data-factory azure-batch


    【解决方案1】:

    根据我的理解,personIds.Batch(100) 只是将personIds 分批成大小为 (100) 的存储桶。

    //method1
    foreach (var personIdsBatch in personIds.Batch(100))
    {
        customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
    }
    
    //method2
    customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIds));
    

    上述两种方法都会按顺序为每个人调用您的自定义 API,而 method1 已添加额外的逻辑来处理相同的任务。

    Azure Batch 是否支持异步/等待?

    根据你的代码,我定义了IDotNetActivity的实现如下,你可以参考一下:

    public class MyDotNetActivity : IDotNetActivity
    {
        public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
        {
            return ExecuteAsync(linkedServices, datasets, activity, logger).Result;
        }
    
        async Task<IDictionary<string, string>> ExecuteAsync(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
        {
            List<int> personIds = await GetPersonIds("{clientAddress}", "{clientUsername}", "{clientPassword}");
            var tasks = new List<Task<List<CustomApiResult>>>();
            foreach (var personIdsBatch in personIds.Batch(100))
            {
                tasks.AddRange(GetCustomResultsByBatch("{address}", "{username}", "{password}", "{personIdsBatch}"));
            }
    
            var taskResults = await Task.WhenAll(tasks);
            List<CustomApiResult> customResults = taskResults.SelectMany(r=>r).ToList();
    
            //process the custom api results
    
            return new Dictionary<string, string>();
        }
    
        async Task<List<CustomApiResult>> GetCustomResultsByBatch(string address, string username, string password, IEnumerable<int> personIdsBatch)
        {
            //Get Custom Results By Batch
            return new List<CustomApiResult>();
        }
    
        async Task<List<int>> GetPersonIds(string clientAddress, string clientUsername, string clientPassword)
        {
            //load a list of people from a file in Azure Blob storage
            return new List<int>(); 
        }
    }
    

    另外,我假设您可以利用Parallel.ForEach 来并行执行您的同步作业:

    List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
    var customResults = new List<CustomApiResult>();
    Parallel.ForEach(personIds.Batch(100), 
    new ParallelOptions()
    {
        MaxDegreeOfParallelism=5
    },
    (personIdsBatch) =>
    {
        var results = GetCustomResultsByBatch(address, username, password, personIdsBatch);
        lock (customResults)
        {
            customResults.AddRange(results);
        }
    });
    

    【讨论】:

    • 谢谢布鲁斯,这行得通。 MoreLinq 批处理工作。这在几个月前是行不通的。异步/等待代码也可以正常工作。我不知道为什么它以前不起作用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-11
    相关资源
    最近更新 更多