【问题标题】:Wrapping blocking calls to be async for better thread reuse and responsive UI将阻塞调用封装为异步,以实现更好的线程重用和响应式 UI
【发布时间】:2015-01-21 15:54:44
【问题描述】:

我有一个类负责通过调用旧类来检索产品可用性。这个遗留类本身通过 BLOCKING 网络调用在内部收集产品数据。 请注意,我无法修改旧版 API 的代码。由于所有产品都是相互独立的,我希望在不创建任何不必要的线程的情况下并行收集信息,也不会阻塞在调用此遗留 API 时被阻塞的线程。有了这个背景,这里就是我的基础课程了。

class Product
    {
        public int ID { get; set; }
        public int  VendorID { get; set; }
        public string Name { get; set; }
    }

    class ProductSearchResult
    {
        public int ID { get; set; }
        public int AvailableQuantity { get; set; }
        public DateTime ShipDate { get; set; }
        public bool Success { get; set; }
        public string Error { get; set; }
    }

class ProductProcessor
    {
        List<Product> products;
        private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
        CancellationTokenSource cts = new CancellationTokenSource();
        public ProductProcessor()
        {
            products = new List<Product>()
            {
                new Product() { ID = 1, VendorID = 100, Name = "PC" },
                new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
                new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
                new Product() { ID = 4, VendorID = 102, Name = "GPS" },
                new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
            };

        }

        public async void Start()
        {
            Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
            Parallel.For(0, products.Count(), async i =>
            {
                tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);

            });



            Task<ProductSearchResult> results = await Task.WhenAny(tasks);

            // Logic for waiting on indiviaul tasks and reporting results

        }

        private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
        {
            ProductSearchResult result = new ProductSearchResult();
            result.ID = productId;

            if (cancellationToken.IsCancellationRequested)
            {
                result.Success = false;
                result.Error = "Cancelled.";
                return result;
            }

            try
            {
                await mutex.WaitAsync();
                if (cancellationToken.IsCancellationRequested)
                {
                    result.Success = false;
                    result.Error = "Cancelled.";
                    return result;
                }

                LegacyApp app = new LegacyApp();
                bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
                if (success)
                {
                    result.Success = success;
                    result.AvailableQuantity = app.AvailableQuantity;
                    result.ShipDate = app.ShipDate;
                }
                else
                {
                    result.Success = false;
                    result.Error = app.Error;
                }
            }
            finally
            {
                mutex.Release();
            }

            return result;

        }

    }

鉴于我正在尝试将异步封装在同步 API 上,我有两个问题。

  1. 通过使用 Parallel.For 并将 Legay API 调用包装在 Task.Run 中,我是否创建了任何不必要的线程,这些线程可以在不阻塞调用线程的情况下避免,因为我们将在 UI 中使用此代码。
  2. 这段代码看起来仍然是线程安全的吗?

【问题讨论】:

    标签: c# .net multithreading task-parallel-library async-await


    【解决方案1】:

    编译器会给你关于async lambda 的警告。仔细阅读;它告诉你它不是异步的。在那里使用async 毫无意义。另外,不要使用async void

    由于您的底层 API 处于阻塞状态 - 并且无法更改 - 异步代码不是一种选择。我建议使用多个Task.Run 调用 Parallel.For,但不能同时使用。所以让我们使用并行。实际上,让我们使用并行 LINQ,因为您正在转换一个序列。

    使RetrieveProductAvailablity 异步是没有意义的;除了节流之外,它只做阻塞工作,并且并行方法具有更自然的节流支持。这使您的方法看起来像:

    private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
    {
      ... // no mutex code
      LegacyApp app = new LegacyApp();
      bool success = app.RetrieveProductAvailability(productId);
      ... // no mutex code
    }
    

    然后您可以像这样进行并行处理:

    public void Start()
    {
      ProductSearchResult[] results = products.AsParallel().AsOrdered()
          .WithCancellation(cts.Token).WithDegreeOfParallelism(2)
          .Select(product => RetrieveProductAvailability(product.ID, cts.Token))
          .ToArray();
      // Logic for waiting on indiviaul tasks and reporting results
    }
    

    从您的 UI 线程中,您可以调用使用 Task.Run 的方法:

    async void MyUiEventHandler(...)
    {
      await Task.Run(() => processor.Start());
    }
    

    这使您的业务逻辑保持整洁(仅同步/并行代码),并且将这项工作从 UI 线程(使用 Task.Run)移出的责任属于 UI 层。

    更新:我添加了对AsOrdered 的调用,以确保结果数组与产品序列具有相同的顺序。这可能是必要的,也可能不是必要的,但由于原始代码保留了顺序,因此现在也保留了此代码。

    更新:由于您需要在每次检索后更新 UI,您可能应该为每个用户使用 Task.Run 而不是 AsParallel

    public async Task Start()
    {
      var tasks = products.Select(product =>
          ProcessAvailabilityAsync(product.ID, cts.Token));
      await Task.WhenAll(tasks);
    }
    
    private SemaphoreSlim mutex = new SempahoreSlim(2);
    private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
    {
      await mutex.WaitAsync();
      try
      {
        var result = await RetrieveProductAvailability(id, token);
        // Logic for reporting results
      }
      finally
      {
        mutex.Release();
      }
    }
    

    【讨论】:

    • 在这种情况下使用 AsParallel 和 Parallel.For 有什么区别?
    • @johnsmith 查看每种方法的文档以了解它们可以做什么以及如何使用它们。
    • @johnsmith:Parallel 和 Parallel LINQ 非常相似,但在不同的场景中,一个比另一个稍微好一点。 Parallel 在 CPU 使用率/超并行性方面“更好”,而 Parallel LINQ 更容易用于序列操作(例如,Select)和聚合(例如,Average)。有一个很棒的比较文章here
    • @StephenCleary 为了确保我对您的建议很清楚,等待单个任务和报告结果的逻辑将移至 MyUIEventHandler 方法。它是否正确?您在 Start 方法中有评论,所以只是想确定我在这里没有做错什么?
    • @johnsmith:如果“报告结果”与 UI 交互,那么是的,那将是最合适的。在这种情况下,Start 可能应该返回 ProductSearchResult[]
    【解决方案2】:

    我是否创建了任何本可以避免的不必要的线程 不会阻塞调用线程,因为我们将在 UI 中使用此代码。

    是的。您的代码通过Parallel.ForEach 旋转新线程,然后在RetrieveProductAvailablity 内部再次旋转。没有必要。

    async-awaitParallel.ForEach don't really play nice together,因为它将您的异步 lambda 转换为 async void 方法而不是 async Task

    我建议放弃 Parallel.ForEach 和包装的同步调用,然后执行以下操作:

    将您的方法调用从异步更改为同步(因为它实际上根本不是异步):

    private ProductSearchResult RetrieveProductAvailablity(int productId,
                                                           CancellationToken
                                                           cancellationToken)
    

    而不是这个:

    bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
    

    同步调用方法调用:

    bool success = app.RetrieveProductAvailability(productId));
    

    然后在所有这些上显式调用Task.Run

    var productTasks = products.Select(product => Task.Run(() => 
                                       RetrieveProductAvailablity(product.ID, cts.Token))
    
    await Task.WhenAll(productTasks);
    

    一般不建议暴露async wrappers over sync methods

    【讨论】:

    • 那么,如果我想为这种情况提供响应 UI,您建议采用什么方法?我可以使用常规的 for 循环代替 Parallel.For 同时使其线程安全吗?
    猜你喜欢
    • 2016-07-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-07
    • 2023-03-17
    相关资源
    最近更新 更多