【问题标题】:Running several infinite loops with async/await使用 async/await 运行多个无限循环
【发布时间】:2013-08-10 14:09:32
【问题描述】:

我正在开发基于 xamarin 和 .net 5 async/awaits 的 android messanger 应用程序。

在我的应用程序中,我有生产者/消费者模式,用于处理在无限循环上生成的消息。

例如 ReadTcpClientAsync 生产者:

async Task ReadTcpClientAsync(CancellationToken cancellationToken)
{
    cde.Signal();
    while (!cancellationToken.IsCancellationRequested)
    {
        byte[] buffer = await atc.ReadAsync(cancellationToken);
        // queue message...
    }
}

或 SendStatementsAsync 消费者,它使消息队列化并等待 WriteAsync

private async Task SendStatementsAsync(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        var nextItem = await _outputStatements.Take();
        cancellationToken.ThrowIfCancellationRequested();
        // misc ...
        await atc.WriteAsync(call.Serialize());
    }
}

有些消费者只是等待接听电话

 var update = await _inputUpdateStatements.Take();

这种结构在测试中效果很好,但有一种方法我认为我犯了一个巨大的错误。 此方法旨在运行整个客户端后端,同时启动 3 个 pro/con while (true) 循环。

这里是:

public async Task RunAsync()
{
   _isRunning = true;
   _progress.ProgressChanged += progress_ProgressChanged;
    await InitMTProto(_scheme).ConfigureAwait(false); // init smth...
    // various init stuf...     
    await atc.ConnectAsync().ConfigureAwait(false); // open connection async
    // IS IT WRONG?
    try
    {                                   
        await Task.WhenAny(SendStatementsAsync(_cts.Token),
                               ReadTcpClientAsync(_cts.Token),
                               ProcessUpdateAsync(_cts.Token, _progress)).ConfigureAwait(false);
    }
    catch (OperationCanceledException oce)
    {   

    }
    catch (Exception ex)
    {

    }
}

暂时忘掉 android,想想 UI 上下文中的任何 UI(WinForm、WPF 等)的 OnCreate 方法来调用 RunAsync

protected async override void OnCreate(Bundle bundle)
{
    // start RA 
    await client.RunAsync()
    // never gets here - BAD, but nonblock UI thread - good
    Debug.WriteLine("nevar");
}

所以,如您所见,有问题。在 RunAsync 等待调用之后我什么都做不了,因为它永远不会从 Task.WhenAny(...) 返回。我需要在那里执行状态检查,但我需要启动这个 pro/cons 方法,因为我的检查等待 ManualResetEvent :

if (!cde.Wait(15000))
{
    throw new TimeoutException("Init too long");
}

另外,我的支票也是异步的,它就像一个魅力:)

public async Task<TLCombinatorInstance> PerformRpcCall(string combinatorName, params object[] pars)
{
    // wait for init on cde ...
    // prepare call ...

    // Produce
    ProduceOutput(call);

    // wait for answer
    return await _inputRpcAnswersStatements.Take();
}

我认为我应该使用另一种方法来启动这个无限循环,但是我已经有一路异步任务方法 - 所以我真的不知道该怎么做。 有什么帮助吗?

【问题讨论】:

  • 看看 BlockingCollection
  • 只是不要awaitRunAsync()?
  • @Blam 我有自己的 IProducerConsumer 实现,它工作得很好,我不需要另一个。
  • @svick 这正是我所做的。但是 IL 中的 await RunAsync 会创建 ContinueWith 构造,其中 RunAsync 之后的所有语句都将位于该构造中。所以它们永远不会被执行,因为 RunAsync WhenAny 永远不会触发(取消除外,但它是无关紧要的)
  • 我的意思是你应该尝试调用RunAsync(),但没有await(假设你可以忽略它抛出的任何异常)。

标签: c# .net asynchronous async-await xamarin


【解决方案1】:

好的,经过大量阅读(没有找到)和@svick 的建议,我决定将这些方法称为不带“等待”的方法,作为单独的 Task.Run 方法。 Aso 我决定在 ThreadPool 中运行它。

我的最终代码是:

try
{                                   
    /*await Task.WhenAny(SendStatementsAsync(_cts.Token), 
           ReadTcpClientAsync(_cts.Token),
           ProcessUpdateAsync(_cts.Token, _progress)).ConfigureAwait(false);*/
    Task.Run(() => SendStatementsAsync(_cts.Token)).ConfigureAwait(false);
    Task.Run(() => ReadTcpClientAsync(_cts.Token)).ConfigureAwait(false);
    Task.Run(() => ProcessUpdateAsync(_cts.Token, _progress)).ConfigureAwait(false);
    Trace.WriteLineIf(clientSwitch.TraceInfo, "Worker threads started", "[Client.RunAsync]");
}

一切都按预期正常运行.. 我不确定它会在异常处理中引起什么问题,因为我知道它们会丢失

当然这样的调用会产生警告

因为没有等待这个调用,所以执行当前方法 在呼叫完成之前继续。考虑应用“等待” 调用结果的运算符。

这种方式很容易抑制

// just save task into variable
var send = Task.Run(() => SendStatementsAsync(_cts.Token)).ConfigureAwait(false); 

另外,如果有人知道更好的解决方案,我将不胜感激。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-09-26
    • 1970-01-01
    • 2016-12-30
    • 2020-04-07
    • 2019-03-19
    • 1970-01-01
    • 2019-07-16
    • 1970-01-01
    相关资源
    最近更新 更多