【问题标题】:Async/Await while another method does the action异步/等待,而另一个方法执行操作
【发布时间】:2015-11-09 05:25:07
【问题描述】:

我想实现一个流程,当在我的班级中调用异步方法时,他们将注册到调度程序列表并等待。然后另一个线程运行并执行调度程序列表中的方法,当每个动作完成时,等待将完成,我将返回计算的值。

到目前为止,这是我的代码:

private List<Func<ClassA, ClassB, ResponseBase>> _actionsDict;
public ExecutingScheduler()
{
    _actionsDict = new List<Func<ClassA, ClassB, ResponseBase>>();
    Task.Factory.StartNew(ExecuteNextTask);
}

private void ExecuteNextTask()
{
    while (_actionsDict.Count > 0)
    {
        // Get first while removing it
        var next = _actionsDict[0];
        _actionsDict.RemoveAt(0);

        // Next line has an error now, how do I call it with the original params as I added it to the list
        next();

        Task.Delay(2000);
    }
}

public async Task<ResponseBase> StartStreamAsync(ClassA classA, ClassB classB)
{
    _actionsDict.Add((unit, guid) => StartStream(classA, classB));

    // I don't want the first each time, I want to await the same instance as I added
    var response = await Task.Run(() => _actionsDict[0](classA, classB)); 
    return response;
}

public async Task<ResponseBase> PrepareStreamAsync(ClassA classA, ClassB classB)
{
    _actionsDict.Add((unit, guid) => PrepareStream(classA, classB));

    // I don't want the first each time, I want to await the same instance as I added
    var response = await Task.Run(() => _actionsDict[0](classA, classB)); 
    return response;
}

所以有两个问题: 1) 如何正确添加到操作列表中,以便以后可以使用参数一个接一个地调用列表中的函数?

2) 如何正确地将执行器方法的返回值传递给原始请求的调用者?

随意提出不同的方法

【问题讨论】:

  • 你看过TPL Dataflow吗?
  • @AlexD:您需要一个极其复杂的解决方案,这不禁让我问:您为什么需要这个?您要解决什么实际问题?
  • @StephenCleary 我想他想排队一堆伪Tasks 然后异步运行它们,然后await WhenAll 完成。

标签: c# async-await


【解决方案1】:

我不太明白你的问题。但我假设您需要一个具有两个 async 方法的类,它们可以按照 actionDict 中的顺序执行操作。

不管怎样,代码如下:(新版本)

    List<Task<ResponseBase>> _actionsDict = new List<Task<ResponseBase>>();
    Timer _timer = new Timer(1000);

    public ExecutingScheduler()
    {
        _timer.Elapsed += (s, e) => {
            while (_actionsDict.Count > 0) {
                var a = _actionsDict[0];
                _actionsDict.RemoveAt(0);

                a.Start();
                a.Wait();
            }
        };
        _timer.Start();
    }

    public Task<ResponseBase> StartStreamAsync(ClassA classA, ClassB classB)
    {
        return QueueResponse(() => StartStream(classA, classB));
    }

    public Task<ResponseBase> PrepareStreamAsync(ClassA classA, ClassB classB)
    {
        return QueueResponse(() => PrepareStream(classA, classB));
    }

    public Task<ResponseBase> QueueResponse(Func<ResponseBase> action)
    {
        var t = new Task<ResponseBase>(action);
        _actionsDict.Add(t);

        return Task.Run(() => t.Result);
    }

【讨论】:

  • return Task.Run(() =&gt; task.Result) 到底是什么?!你不是说return task.ConfigureAwait(false)吗?
  • @Aron 当然不是,用户可能调用await StartStreamAsync 意味着该方法应该返回一个任务,但是ConfigureAwait(false) 会返回一个ConfigurableAwaiter,它不会编译。用Task 包裹t.Result 的原因是当用户等待这个任务运行时,它会阻塞它们(因为任务还没有被执行)。 timer 每 1 秒工作一次,以确保所有排队的任务都能正常运行。在每个任务运行后,t.Result 将可用并从该任务返回。然后用户可以做即将发生的事情。
【解决方案2】:

您似乎正在尝试重新实现任务并行库。特别是TaskSchedulerTask.WhenAll

试试这个。

var queries = new [] 
              { 
                  new { ClassA =..., ClassB = ...},
                  new { ClassA =..., ClassB = ...},
                  new { ClassA =..., ClassB = ...},
              };
var tasks = queries.Select(item => Task.Run(() => PrepareStream(item.ClassA, item.ClassB);
var results = await Task.WhenAll(tasks);

但是,像PrepareStream 这样的名字我怀疑你真的不想使用线程。

只有在 CPU 受限或有特殊 (UI) 线程(这是 CPU 受限的特殊情况)时才应使用线程。

【讨论】:

  • OP 说they (the ones called the asynchronous method) would sign up to a scheduler list and wait,根据他的原始代码,他的意思是ExecutingScheduler 类保存了异步方法调用的列表,并一个一个地执行它们,而不是一起执行。
【解决方案3】:

(已编辑) 您可以将任务添加到集合中,而不是将函数添加到操作集合中。

//Better use thread-safe ConcurrentQueue
private ConcurrentQueue<Task<ResponseBase>> _actions;

public ExecutingScheduler()
{
    _actions = new ConcurrentQueue<Task<ResponseBase>>();
    Task.Factory.StartNew(ExecuteNextTask);
}

private void ExecuteNextTask()
{
    while (true)
    {
        // Get first while removing it
        Task<ResponseBase> next;
        var containsElement = _actions.TryDequeue(out next);
        if (containsElement)
        {
            next.Start();
            next.Wait();
        }
        Task.Delay(2000);
    }
}

public async Task<ResponseBase> StartStreamAsync(ClassA classA, ClassB classB)
{
    var task = new Task<ResponseBase>(() => StartStream(classA, classB));

    //Add task to queue
    _actions.Enqueue(task);

    var result = await task;

    return result;
}

public async Task<ResponseBase> PrepareStreamAsync(ClassA classA, ClassB classB)
{
    var task = new Task<ResponseBase>(() => PrepareStream(classA, classB));

    //Add task to queue
    _actions.Enqueue(task);

    var result = await task;
    return result;
}

【讨论】:

  • 稍微解释一下你的代码怎么样?它与 OP 的代码有什么不同/正确?
  • 这不会运行。而且绝对不能异步工作。
  • @CliveDM 我认为它会异步运行,但它只会死锁,仅此而已。 #NeverUseTaskResult
  • @Aron 在他的原始帖子中,他没有await 让任务运行,所以它会同步运行。并且 ExecuteNextTask 方法(现在已修复)将在构造类时仅运行一次,那时 _actionsDict.Count &gt; 0 将为 false 并且整个方法将立即返回,无需调用该方法,所有任务将在列表中排队而不被执行。因此永远不会获得task.Result,这将导致整个线程死机。
  • @NikitaIlin 是的,这似乎运行。一个问题,一旦整个队列用完,这个while(true) 会消耗大量的CPU 时间(我的意思是全部),考虑改用Timer
猜你喜欢
  • 2014-05-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-15
相关资源
最近更新 更多