【问题标题】:How to Post to a BufferBlock and get a result from the ActionBlock?如何发布到 BufferBlock 并从 ActionBlock 中获取结果?
【发布时间】:2021-06-22 11:59:05
【问题描述】:

有一个对象一次只能处理一个请求,处理它需要一点时间。任务完成后,它会引发一个返回结果的事件。下面代码中的对象是Computer,假设我不能改变这个类的行为。

现在,我想创建一个包装类,让客户觉得他们可以随时发送请求。该请求现在是一个异步方法,因此客户端可以简单地等待,直到返回结果。当然,底层对象一次只能处理一个请求,所以包装器需要对请求进行排队,当“处理完成”事件到来时,需要将结果返回给相应的客户端。这个包装类在下面的代码中是SharedComputer

我想我需要在“Place2”返回从“Place1”获得的值。推荐的做法是什么? BufferBlock/ActionBlock 没有返回值的机制吗?

    static void Main(string[] args)
    {
        SharedComputer pc = new SharedComputer();
        for(int i =0; i<10; i++)
        {
            Task.Factory.StartNew(async() =>
            {
                var r = new Random();
                int randomDelay = r.Next(500, 5000);
                Thread.Sleep(randomDelay);
                int random1 = r.Next(0, 10);
                int random2 = r.Next(0, 10);
                int sum = await pc.Add(random1, random2);
                if(random1 + random2 == sum)
                {
                    Debug.WriteLine($"Got correct answer: {random1} + {random2} = {sum}.");
                }
                else
                {
                    Debug.WriteLine($"Got incorrect answer: {random1} + {random2} = {sum}.");
                }
            });
        }
        System.Console.Read();
    }
}

class SharedComputer
{
    Computer Mainframe= Computer.GetInstance();
    BufferBlock<TwoNumbers> JobQueue = new BufferBlock<TwoNumbers>();
    TaskCompletionSource<int> TCS;

    public SharedComputer()
    {
        Mainframe.Calculated += Mainframe_Calculated;
        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1
        };

        var jobProcessor = new ActionBlock<TwoNumbers>(async e =>
        {
            Debug.WriteLine("Starting an adding");
            TCS = new TaskCompletionSource<int>();
            Mainframe.StartAdding(e.A, e.B);
            var res = await TCS.Task; // Place1
            Debug.WriteLine("Got the result."); 
        }, options);

        JobQueue.LinkTo(jobProcessor);
    }

    private void Mainframe_Calculated(object sender, int e)
    {
        TCS.SetResult(e);
    }

    public async Task<int> Add(int a, int b)
    {
        var data = new TwoNumbers()
        {
            A = a,
            B = b
        };
        Debug.WriteLine("Queuing a new adding.");
        JobQueue.Post<TwoNumbers>(data);

        return 1;//Place2: Return the value received at Place1.
    }

    struct TwoNumbers
    {
        public int A;
        public int B;
    }
}

class Computer
{
    static Computer Instance;
    bool IsWorking = false;
    private Computer()
    {
    }

    public static Computer GetInstance()
    {
        if (Instance == null)
            Instance = new Computer();
        return Instance;
    }

    public event EventHandler<int> Calculated;
    public void StartAdding(int a, int b)
    {
        if (IsWorking)
        {
            throw new InvalidOperationException("Already working.");
        }
        IsWorking = true;
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(3000);
            IsWorking = false;
            Calculated(this, a + b);
        });
    }
}

【问题讨论】:

  • 附带说明一下,您的代码会快速连续创建 10 个 Random 实例,这使得其中一些实例可能会使用相同的种子进行播种。 BufferBlock 也可能是多余的。 ActionBlock 有自己的内部输入队列。您可以看到here 一种将工作发送到ActionBlock 的惯用方式,并在工作完成时收到通知(它使用嵌套任务而不是TaskCompletionSources)。
  • @TheodorZoulias 谢谢。关于冗余,您可能是对的。我看到的示例使用了 BufferBlock,所以我就这样使用它,但 ActionBlock 似乎也有 Post()。而 ActionBlock>> 似乎确实得到了我想要的行为;虽然很难理解有一个任务的任务。
  • 是的,使用任务的任务是一种相当先进的技术。您可以改用TaskCompletionSources 来实现相同的目的,但代码会更冗长。您需要将TwoNumbers TaskCompletionSource 发布到ActionBlock,从而需要包装器类型,或使用tuples 作为包装器。

标签: c# task-parallel-library


【解决方案1】:

底层对象一次只能处理一个请求,因此包装器需要对请求进行排队,当“处理完成”事件到达时,需要将结果返回给相应的客户端。

所以你需要的是互斥。虽然您可以从 TPL Dataflow 和 TaskCompletionSource&lt;T&gt; 构建互斥锁,但使用内置的 SemaphoreSlim 会容易得多。

IMO 首先编写异步抽象,然后添加互斥会更简洁。异步抽象would look like

public static class ComputerExtensions
{
  public static Task<int> AddAsync(this Computer computer, int a, int b)
  {
    var tcs = new TaskCompletionSource<int>();
    EventHandler<int> handler = null;
    handler = (_, result) =>
    {
      computer.Calculated -= handler;
      tcs.TrySetResult(result);
    };
    computer.Calculated += handler;
    computer.StartAdding(a, b);
  }
}

拥有异步 API 后,您可以通过 SemaphoreSlim 轻松应用异步限制(或互斥):

class SharedComputer
{
  Computer Mainframe = Computer.GetInstance();
  readonly SemaphoreSlim _mutex = new();

  public async Task<int> AddAsync(int a, int b)
  {
    await _mutex.WaitAsync();
    try { return Mainframe.AddAsync(a, b); }
    finally { _mutex.Release(); }
  }
}

顺便说一句,use Task.Run instead of Task.Factory.StartNew

【讨论】:

    猜你喜欢
    • 2016-05-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-06
    • 2016-12-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多