【问题标题】:Create async method and set task result from another method创建异步方法并从另一个方法设置任务结果
【发布时间】:2013-11-05 06:03:53
【问题描述】:

我正在使用使用 TCP 和 SSL 流的非常特定的 Web API。与 API 的连接是持久的(不应在每次写入/读取后关闭),因为此连接用于接收来自服务器的事件通知。

于是我实现了一个循环阅读的方法:

private void Listen()
{
    int length;
    do
    {
        var buffer = new byte[TcpClient.ReceiveBufferSize];
        length = SslStream.Read(buffer, 0, TcpClient.ReceiveBufferSize);

        if (length > 0)
        {
            // Process received data
            Listen();
        }
    } while (length > 0);

    //reconnect
}

现在我需要调用一些 API 方法。我希望他们支持 TPL(异步和等待)。问题是这些方法的异步部分实际上是在上面的读取方法(Listen)中实现的。例如:

public async Task<bool> Authenticate(string clientId, string clientSecret)
{
    await SendAuthMessageOverNetwork(clientId, clientSecret);
    return await ReceiveAuthResponseFromServer();
    // The problem is that there is no ReceiveAuthResponseFromServer method - 
    // the response is received in reading thread (Listen).
}

我对 TPL 还不是很熟悉。我通过使用一个任务解决了这个问题,它实际上什么都不做,只是等待来自读取线程的信号(AutoResetEvent):

private AutoResetEvent _loginEvent;
private bool _loginResult;

public async Task<bool> Authenticate(string clientId, string clientSecret)
{
    await SendAuthMessageOverNetwork(clientId, clientSecret);
    return await Task<bool>.Factory.StartNew(() => { _loginEvent.WaitOne(); return _loginResult; });
}

private void Listen()
{
    ...
    if (msg.Type == MessageTypes.AuthenticationResponse)
    {
        _loginResult = true;
        _loginEvent.Set();
    }
    ...
}

但我不太喜欢这种解决方案。也许有一种更简单的方法来实现我想要的?这可以通过仅使用任务功能而不使用 AutoResetEvent 和中间全局变量来完成吗?

【问题讨论】:

    标签: c# multithreading asynchronous task-parallel-library async-await


    【解决方案1】:

    您是对的,使用 AutoResetEvent 是错误的方法。你真正想做的是:

    • 假设每个 SendAuthMessageOverNetwork 都与 ReceiveAuthResponseFromServer 匹配,请将它们组合成一个方法。
    • 在该方法中,发送请求并将新的 TaskCompletionSource 放入读取循环可以看到的队列中
    • 返回任务完成源的 Task 属性作为结果
    • 在读取循环中,当您读取一条消息时,将下一个完成源从队列中出列并使用 taskSource.SetResult(responseFromServer)

    所以是这样的:

    private readonly Queue<ResponseMessageType> _responseQueue = new Queue<ResponseMessageType>();
    
    public async Task<bool> Authenticate(string clientId, string clientSecret) {
        var response = AsyncRequestAResponse(MakeAuthMessage(clientId, clientSecret));
        return (await response).Type == MessageTypes.AuthenticationResponse
    }
    
    public Task<bool> AsyncRequestAResponse(RequestMessageType request) {
        var responseSource = new TaskCompletionSource<ResponseMessageType>();
        _responseQueue.Enqueue(responseSource);
        Send(request);
        return responseSource.Task
    }
    
    private void Listen() {
        ...
        if (_responseQueue.Count == 0)
            throw new Exception("Erm, why are they responding before we requested anything?");
        _responseQueue.Dequeue().SetResult(msg);
    }
    

    换句话说,使用TaskCompletionSource&lt;T&gt; 将您在内部执行的网络读/写内容转换为您向调用者公开的异步内容。

    它可能看起来与上面的完全不一样...我假设示例中存在继承顺序和一对一的响应/请求。您可能必须将请求 ID 与响应 ID 或类似的东西相匹配,并且有将异常而不是结果放入队列任务的超时,等等。

    此外,如果响应可以与发送的请求同时到达,那么在涉及队列的操作周围放置锁很重要。

    【讨论】:

    • 谢谢!正是我需要的。
    • 这里有一个微妙的时刻@AlekseyShubin 应该注意。如果启动await Task 的线程是具有默认同步上下文的非UI 线程,则调用_responseQueue.Dequeue().SetResult(msg) 将立即在Listen() 正在运行的线程上调用延续回调(await 之后的代码)。因此,Listen 将被阻止,直到在消费者端的代码流中出现另一个 await。这可能会导致死锁,具体取决于逻辑工作流。更多信息:stackoverflow.com/q/19481964/1768303
    • @Noseratio 对。永远不要阻塞任务,除非您了解 TPL 的工作原理以及任务是如何从上到下完成的。
    猜你喜欢
    • 2014-09-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-27
    • 2014-01-19
    • 1970-01-01
    • 2015-09-11
    • 2013-08-24
    相关资源
    最近更新 更多