【问题标题】:Multithreaded async pattern多线程异步模式
【发布时间】:2015-01-28 02:58:41
【问题描述】:

我有一个场景,多个线程通过单个套接字发送数据。唯一 ID 已插入到消息中,并且唯一 ID 在响应消息中回显。当套接字与单个客户端隔离时(如预期的那样),一切都很好。现在我正在为客户端等待特定响应的多个线程寻找异步/等待模式。

一些代码来演示:

using (var ns = new NetworkStream(_socket))
{
    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);
    byte[] responseBuffer = await ReceiveMessageAsync(ns);

    // process response message
}

上面的示例在多线程场景中不起作用,因为消息可以按任何顺序返回,因此下一条消息可能不属于当前客户端。我的想法是,客户端将使用其唯一 ID 注册一个委托或任务(将其存储在字典中),并且当一条带有该唯一 ID 的消息返回时,委托或任务将使用响应字节“完成”。我猜这使用 EventHandler 很容易实现,但我正在寻找一种等待响应的方法。

例如:

using (var ns = new CustomNetworkStream(_socket))
{
    Task waitTask = ns.RegisterTask(this.UniqueId);

    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);

    byte[] responseBuffer = await waitTask;

    // process response message
}

我不知道“RegisterTask”方法会是什么样子,如何存储任务,如何让任务“等待”,然后以任务为结果“完成”。

有什么想法吗?我研究了 Toub 的 Async Coordination Primitives 系列,但我不确定这是否是正确的方法。

【问题讨论】:

  • 我想你只需要TaskCompletionSource<T>。但是,对于套接字通信,多线程异步 API 非常复杂,远比您的示例代码显示的要复杂。我建议使用更高级别的抽象,例如 SignalR。
  • 我的库 Griffin.Framework 的网络部分内置了对它的支持(使用异步客户端)。 griffinframework.net
  • 谢谢@jgauffin,我去看看。
  • "stream" 表示 TCP,但 "multiple clients on single socket" 表示 UDP。无论哪种方式,我都会说,虽然你可以让它工作,但这里有一个更基本的设计问题。如果没有详细说明你是如何陷入这种境地的,这个问题很难完全理解。
  • @PeterDuniho - 我正在创建一个持久套接字并允许多个线程通过连接发送/接收数据。

标签: c# multithreading asynchronous task-parallel-library async-await


【解决方案1】:

因此,您需要将所有这些包装到一个新类中,因为您需要在阅读的地方和写作的地方之间共享状态。

每次写入流时,您都需要接受唯一 ID,并将条目添加到将 ID 映射到 TaskCompletionSource 的查找中。然后 write 方法可以从该 TCS 返回 Task

然后,您可以有一个单独的阅读器,它只是坐在那里从您的流中读取,找到与该响应的 ID 关联的字典条目,并设置其结果。

public class MyNetworkStream : IDisposable
{
    private NetworkStream stream;
    private ConcurrentDictionary<int, TaskCompletionSource<byte[]>> lookup =
        new ConcurrentDictionary<int, TaskCompletionSource<byte[]>>();
    private CancellationTokenSource disposalCTS = new CancellationTokenSource();
    public MyNetworkStream(Socket socket)
    {
        stream = new NetworkStream(socket);
        KeepReading();
    }
    public void Dispose()
    {
        disposalCTS.Cancel();
        stream.Dispose();
    }

    public Task<byte[]> WriteAndWaitAsync(byte[] buffer, int offset, 
        int count, int uniqueID)
    {
        var tcs = lookup.GetOrAdd(uniqueID, new TaskCompletionSource<byte[]>());
        stream.WriteAsync(buffer, offset, count);
        return tcs.Task;
    }

    private async void KeepReading()
    {
        try
        {
            //TODO figure out what you want for a buffer size so that you can 
            //read a block of the appropriate size.
            byte[] buffer = null;
            while (!disposalCTS.IsCancellationRequested)
            {
                //TODO edit offset and count appropriately 
                await stream.ReadAsync(buffer, 0, 0, disposalCTS.Token);
                int id = GetUniqueIdFromBlock(buffer);
                TaskCompletionSource<byte[]> tcs;
                if (lookup.TryRemove(id, out tcs))
                    tcs.TrySetResult(buffer);
                else
                {
                    //TODO figure out what to do here
                }
            }
        }
        catch (Exception e)
        {
            foreach (var tcs in lookup.Values)
                tcs.TrySetException(e);
            Dispose();
            //TODO consider any other necessary cleanup
        }
    }

    private int GetUniqueIdFromBlock(byte[] buffer)
    {
        throw new NotImplementedException();
    }
}

【讨论】:

    【解决方案2】:

    您需要一个 TaskCompletionSource&lt;byte[]&gt; 用作同步构造,一个 ConcurrentDictionary 用于在 id 和 TCS 和侦听器之间映射:

    ConcurrentDictionary<UniqueId, TaskCompletionSource<byte[]>> _dictionary;
    async Task Listen(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            using (var ns = new NetworkStream(_socket))
            {
                byte[] responseBuffer = await ReceiveMessageAsync(ns);
                var id = ExtractId(responseBuffer);
                TaskCompletionSource<byte[]> tcs;
                if (dictionary.TryRemove(id, out tcs))
                {
                    tcs.SetResult(responseBuffer);
                }
                else
                {
                    // error
                }
            }
        }
    }
    
    Task RegisterTask(UniqueId id)
    {
        var tcs = new TaskCompletionSource<byte[]>();
        if (!_dictionary.TryAdd(id, tcs))
        {
            // error
        }
        return tcs.Task;
    }
    

    但是,正如 Stephen Cleary 所建议的,您可能希望为此使用现有的解决方案。

    【讨论】:

    • 看起来是一个合理的解决方案。通常,我会使用 pub/sub 或 SignalR,但请求/响应都发生在服务器上,作为外部请求/响应的一部分,我需要尽可能多地消除服务器跃点。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多