【问题标题】:async but not parallel异步但不并行
【发布时间】:2016-11-22 17:39:47
【问题描述】:

我有一个定义异步操作的合同,但我从一个未设置为异步的区域调用它。我想同时进行多个调用,所以我从这个开始:

var tasks = inputs.Select(input => service.GetResult(input));
var results = tasks.WhenAll(tasks).Result;

我认为这将并行启动所有调用,然后在第二行等待。但是,查看目标服务的日志记录,我发现调用是串行的。

我发现this article 显示了类似的调用我的方法并解释它不一定并行运行,所以我只是将其切换为直接并行调用以进行测试:

var results = new ConcurrentBag<Result>();
Parallel.ForEach(inputs, input => results.Add(service.GetResult(input).Result));

这按预期工作 - 我可以看到调用是并行进入服务的。

所以,这给我带来了两个问题:

1) 使用选项 2 的缺点是什么?
2) 如何让选项 1 正常工作?

这里有几个服务可以复制这个问题。以 WCFTestClient 和四个整数(1、2、3、4)的列表为例调用 ClientService。 (运行时可能需要更改端口。)

目标服务:

using System.Diagnostics;
using System.ServiceModel;
using System.Threading.Tasks;

namespace AsyncNotParallel
{
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall, ConcurrencyMode = ConcurrencyMode.Single)]
    public class TargetService : ITargetService
    {
        public async Task<int> GetResult(int input)
        {
            Trace.WriteLine($"In:  {input}");
            await Task.Delay(1000); // Do stuff.
            Trace.WriteLine($"Out: {input}");
            return input;
        }
    }
    [ServiceContract]
    public interface ITargetService
    {
        [OperationContract]
        Task<int> GetResult(int input);
    }
}

客户服务:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.Threading.Tasks;

namespace AsyncNotParallel
{
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall, ConcurrencyMode = ConcurrencyMode.Single)]
    public class ClientService : IClientService
    {
        public int GetResults(List<int> inputs)
        {
            // Option 1:
            var tasks = inputs.Select(input => Execute((ITargetService service) => service.GetResult(input)));
            var results1 = Task.WhenAll(tasks).Result.Sum();

            // Option 2:
            var bag = new ConcurrentBag<int>();
            Parallel.ForEach(inputs, input => bag.Add(Execute((ITargetService service) => service.GetResult(input)).Result));
            var results2 = bag.Sum();

            return results1 + results2;
        }

        private TResult Execute<TService, TResult>(Func<TService, TResult> operation)
        {
            var address = new EndpointAddress("http://localhost:34801/TargetService.svc");
            var binding = new BasicHttpBinding();
            var factory = new ChannelFactory<TService>(binding, address);
            var channel = factory.CreateChannel();
            var result = operation(channel);
            ((IClientChannel)channel).Close();
            return result;
        }
    }

    [ServiceContract]
    public interface IClientService
    {
        [OperationContract]
        int GetResults(List<int> inputs);
    }
}

【问题讨论】:

  • 永远不要使用.Result,总是等待异步调用。
  • @Bart:为了“从不使用 .Result”,我需要将整个客户端更改为异步/等待,但这是我获得异步合同的唯一位置。我可以制作自己的本地合同副本(在共享 DLL 中),然后进行 Parallel 调用,但这并不能回答我的问题,也不能帮助我理解这里发生的事情。
  • asycn 和并行不是一回事。 Asycn 是关于在 IO 发生时不阻塞线程,而并行是关于在多个线程上运行。有一些重叠之处在于您可以异步等待另一个线程完成,但异步的主要目的是防止线程被不必要地阻塞。
  • @juharr:确实如此。但是,我的印象也是,在阻塞当前线程等待完成之前启动几个异步调用将有效地使调用是并行的。你说这不正确吗?
  • 如果没有好的minimal reproducible example,就不可能确定,但​​在我看来,发布的答案可能包括关于service 是什么的正确推断(基于您问题中的标签)和您正在执行代码的环境。更改并发模式应该可以解决您的主要问题。如果您需要更具体的帮助,您需要提供更好的问题。

标签: c# wcf asynchronous parallel-processing


【解决方案1】:

如果您的服务的ConcurrencyModeSingle(默认值 - 可以在ServiceBehavior 属性中覆盖),这意味着服务按顺序处理调用。所以这两个选项实际上都是按顺序执行的,只是第二个得到了无序的结果。您可以切换到ConcurrencyMode.Multiple,这更危险,因为这意味着必须仔细编写服务以确保线程安全。

  1. Parallel 针对 CPU 密集型操作进行了优化,并将根据系统中的内核数量并行化您的调用。实际上,您可以并行化更多的 IO-bound 操作,因此整个事情会执行得更慢。此外,您在每个线程上都使用.Result,浪费了每个任务Parallel 产生的等待时间。我不会使用这种方法。最后,ConcurrentBag 是无序的,这对您可能很重要,也可能不重要。

  2. 在第一个选项中,您将从 UI 线程按顺序启动每个 WCF 调用。这很可能会导致调用由ConcurrencyMode.Single 服务按照列表的相同顺序处理。

您可能应该使用Task.WaitAll() 而不是Task.WhenAll().Result。我会强烈劝阻你不要在 UI 线程上这样做。这是许多令人讨厌的 UI 挂起的根本原因。您可以简单地从同步方法启动一个异步方法(无需Wait()ing) - 只需触发并忘记。等待任务后,只需在异步方法中根据需要更新 UI。

最后一个建议 - 在使用同一通道进行多个并发调用之前,您应该 Open() 它以获得更好的性能。尽管频道会自动执行此操作,但由于频道会进行一些锁定,因此这里有一个好处。

编辑-

看到更新后的代码后,问题是您正在启动一个任务,然后等待通道同步关闭(这会阻塞直到调用完成)。这是一个更好的实现:

private async Task<TResult> Execute<TService, TResult>(Func<TService, Task<TResult>> operation)
{
    var address = new EndpointAddress("http://localhost:34801/A");
    var binding = new BasicHttpBinding();
    var channel = ChannelFactory<TService>.CreateChannel(binding, address);
    var clientChannel = (IClientChannel)channel;
    try
    {
        var result = await operation(channel).ConfigureAwait(false);
        return result;
    }
    finally
    {
        if (clientChannel.State != CommunicationState.Faulted)
        {
            await Task.Factory.FromAsync(clientChannel.BeginClose, clientChannel.EndClose, null).ConfigureAwait(false);
        }
        else if (clientChannel.State != CommunicationState.Closed)
        {
            clientChannel.Abort();
        }
    }
}

我还修改了它以使用缓存的ChannelFactory,并正确关闭和中止频道。

【讨论】:

  • 澄清几点:目标服务为InstanceContextMode.PerCall/ConcurrenceMode.Single。调用者也不是 GUI,而是不同的服务(具有相同的选项)。服务客户端是使用 ChannelFactory 构建的,并使用打开/关闭调用自动包装。关于我的问题:听起来你建议只是将WhenAll 换成WaitAll,然后分别从任务中获取结果?这似乎没有什么区别。
  • 不会,只是更好地使用Tasks API。 WaitAll 可以在特定条件下内联任务。你的问题是ConcurrenceMode.Single。您的两个选项都按顺序执行操作,但第二个选项使它们无序,因为它们是从多个线程启动的。如果你想改变它,你需要改变ConcurrenceMode
  • 很抱歉假设它是一个 UI 线程。我只是看到很多这样的问题:)
  • ChannelFactory 确实会自动打开频道,但我发现在同时使用它时有一个锁在起作用,当您手动打开时,调用会变得更快。
  • 每次调用都会创建一个新通道并在进行实际服务调用之前显式打开它,因此不会共享通道。更改 ConcurrencyMode 并没有什么不同。此外,该服务肯定不会在选项 2 中按顺序处理它们,因为我可以看到我有多个呼叫同时进行的 in 日志,并且呼叫者完成的时间也明显减少。
猜你喜欢
  • 2014-09-12
  • 1970-01-01
  • 1970-01-01
  • 2018-01-07
  • 1970-01-01
  • 2015-10-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多