【问题标题】:await async lambda in ActionBlock在 ActionBlock 中等待异步 lambda
【发布时间】:2012-12-04 13:30:27
【问题描述】:

我有一个带有 ActionBlock 的类 Receiver:

public class Receiver<T> : IReceiver<T>
{

  private ActionBlock<T> _receiver;

  public Task<bool> Send(T item) 
  {
     if(_receiver!=null)
        return _receiver.SendAsync(item);

     //Do some other stuff her
  }

  public void Register (Func<T, Task> receiver)
  {
    _receiver = new ActionBlock<T> (receiver);
  }

  //...
}

ActionBlock 的 Register-Action 是一个带有 await-Statement 的异步方法:

private static async Task Writer(int num)
{
   Console.WriteLine("start " + num);
   await Task.Delay(500);
   Console.WriteLine("end " + num);
}

现在我要做的是同步等待(如果设置了条件),直到操作方法完成以获得独占行为:

var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!

问题在于“等待 Task.Delay(500);”语句被执行,“receiver.Post(5).Wait();”不再等待。

我尝试了几种变体(TaskCompletionSource、ContinueWith、...),但都不起作用。

有人知道如何解决这个问题吗?

【问题讨论】:

  • 您不能通过将_receiver 更改为TransformBlock 来更改您的代码,并将以下操作放入一个新的ActionBlock,链接到_receiver
  • 你能给我一个小的代码示例吗?我不明白重构应该如何解决“排他行为”问题。

标签: c# asynchronous task-parallel-library async-await tpl-dataflow


【解决方案1】:

ActionBlock 默认情况下将强制执行排他行为(一次只处理一项)。如果您所说的“排他行为”是其他意思,您可以在操作完成时使用TaskCompletionSource 通知您的发件人:

... use ActionBlock<Tuple<int, TaskCompletionSource<object>>> and Receiver<Tuple<int, TaskCompletionSource<object>>>
var receiver = new Receiver<Tuple<int, TaskCompletionSource<object>>>();
receiver.Register((Func<Tuple<int, TaskCompletionSource<object>>, Task) Writer);
var tcs = new TaskCompletionSource<object>();
receiver.Send(Tuple.Create(5, tcs));
tcs.Task.Wait(); // if you must

private static async Task Writer(int num, TaskCompletionSource<object> tcs)
{
  Console.WriteLine("start " + num);
  await Task.Delay(500);
  Console.WriteLine("end " + num);
  tcs.SetResult(null);
}

或者,您可以使用AsyncLock (included in my AsyncEx library):

private static AsyncLock mutex = new AsyncLock();

private static async Task Writer(int num)
{
  using (await mutex.LockAsync())
  {
    Console.WriteLine("start " + num);
    await Task.Delay(500);
    Console.WriteLine("end " + num);
  }
}

【讨论】:

  • 是的,ActionBlock 强制执行独占行为是正确的,但如果注册的操作是异步的,则它不再是“真正的独占”。是的,您的解决方案应该可以工作,但我不想添加 TaskCompletionSource 参数,因为该操作是专有逻辑的入口点 - 所以如果用户不调用 tcs.SetResult 它就不再工作了......
  • 在这种情况下,您可以使用AsyncLock。请参阅代码示例的更新答案。您不再知道一个项目何时完成处理,但每个项目将一次处理一个(包括async 处理)。
  • 好的,谢谢,我想这正是我需要的——我会试试的!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-12-30
  • 2021-10-22
  • 1970-01-01
  • 1970-01-01
  • 2015-10-16
  • 1970-01-01
相关资源
最近更新 更多