【问题标题】:How to write from multiple threads to a TcpListener?如何从多个线程写入 TcpListener?
【发布时间】:2019-02-15 08:25:33
【问题描述】:

假设我有一个静态列表 List<string> dataQueue,其中的数据会以随机的时间间隔不断添加,并且以不同的速率(1-1000 个条目/秒)

我的主要目标是将列表中的数据发送到服务器,我使用的是TcpClient 类。

到目前为止我所做的是,我在单线程中将数据同步发送到客户端

byte[] bytes = Encoding.ASCII.GetBytes(message);

tcpClient.GetStream().Write(bytes, 0, bytes.Length);
//The client is already connected at the start

一旦发送数据,我就会从列表中删除该条目。

这工作正常,但是发送数据的速度不够快,列表被填充并消耗更多内存,因为列表被迭代并一个一个发送。

我的问题是我可以使用相同的tcpClient 对象从另一个线程同时写入,还是可以使用另一个tcpClient 对象与另一个线程中的同一服务器的新连接?将此数据发送到服务器的最有效(最快)方式是什么?

PS:我不想使用 UDP

【问题讨论】:

  • TCP/IP 连接在逻辑上是有序数据的顺序流,因此它天生不支持多线程。但是,您可以在一个连接上挂起多个异步写入 - 这些将按任意顺序解决,这可能是您想要的,也可能不是您想要的。 (通常这不是您想要的。)最终,如果您尝试发送的速度快于客户端想要接收的速度,那么随着传输窗口的关闭,发送将会阻塞或失败,并且没有任何异步可以帮助解决这个问题。检查实际的瓶颈是什么。

标签: c# tcpclient


【解决方案1】:

对;这是一个有趣的话题,我想我可以发表意见。听起来您在多个线程之间共享一个套接字 - 只要您非常小心地操作,它就完全有效。 TCP 套接字是字节的逻辑流,因此您不能同时使用它,但是如果您的代码足够快,您可以非常有效地共享套接字,每条消息都是 连续

可能首先要考虑的事情是:您实际上是如何将数据写入套接字的?你的框架/编码代码是什么样的?如果这段代码很糟糕/效率低下:它可能会被改进。例如,它是否通过幼稚的Encode 调用间接地为每个string 创建一个新的byte[]?是否涉及多个缓冲区?是否在取景时多次调用Send?它是如何解决数据包碎片问题的?等等

作为第一个的尝试 - 你可以避免一些缓冲区分配:

var enc = Encoding.ASCII;
byte[] bytes = ArrayPool<byte>.Shared.Rent(enc.GetMaxByteCount(message.Length));
// note: leased buffers can be oversized; and in general, GetMaxByteCount will
// also be oversized; so it is *very* important to track how many bytes you've used
int byteCount = enc.GetBytes(message, 0, message.Length, bytes, 0);
tcpClient.GetStream().Write(bytes, 0, byteCount);
ArrayPool<byte>.Shared.Return(bytes);

这使用了一个租用的缓冲区来避免每次都创建一个byte[] - 这可以大大改善 GC 影响。如果是我,我也可能会使用原始的 Socket 而不是 TcpClientStream 抽象,坦率地说,这不会给你带来很多好处。注意:如果您有其他框架要做:将其包含在您租用的缓冲区的大小中,在写入每个片段时使用适当的偏移量,并且只写入一次 - 即准备整个缓冲区一次 - 避免多次致电Send


现在,听起来您有一个队列和专门的作家;即您的 app 代码附加到队列中,而您的 writer 代码将事物出列并将它们写入套接字。这是一种合理的实现方式,尽管我会添加一些注释:

  • List&lt;T&gt; 是一种实现队列的可怕方式 - 从一开始就删除东西需要重新洗牌(这很昂贵);如果可能,首选Queue&lt;T&gt;,它非常适合您的场景
  • 它需要同步,这意味着您需要确保一次只有一个线程更改队列 - 这通常通过简单的lock 完成,即lock(queue) {queue.Enqueue(newItem);}SomeItem next; lock(queue) { next = queue.Count == 0 ? null : queue.Dequeue(); } if (next != null) {...write it...}

这种方法简单,并且在避免数据包碎片方面具有一些优势 - 写入器可以使用暂存缓冲区,并且只有在缓冲某个阈值时才实际写入套接字,或者当例如,队列是空的 - 但它有可能在发生停顿时创建大量积压。

但是!发生积压的事实表明 某事 跟不上;这可能是网络(带宽)、远程服务器(CPU)——或者可能是本地出站网络硬件。如果这只发生在小信号中,然后自行解决 - 很好(特别是当它发生在一些出站消息很大时),但是:值得一看。

如果这种积压问题反复出现,那么坦率地说,您需要考虑到您只是对当前设计感到饱和,因此您需要解开其中一个瓶颈:

  • 确保您的编码代码有效是零步骤
  • 您可以将 encode 步骤移动到应用程序代码中,即在获取锁之前准备一个帧,对消息进行编码,然后只将一个完全准备好的帧加入队列;这意味着写入器线程除了出列、写入、回收之外无需执行任何操作 - 但它使缓冲区管理更加复杂(显然,在缓冲区完全处理之前您无法回收缓冲区)
  • 如果您还没有采取措施实现这一目标,那么减少数据包碎片可能会有很大帮助
  • 否则,您可能需要(在调查阻塞后):
    • 更好的本地网络硬件 (NIC) 或物理机器硬件(CPU 等)
    • 多个套接字(和队列/工作者)之间循环,分配负载
    • 可能是多个服务器进程,每个服务器都有一个端口,因此您的多个套接字正在与不同的进程通信
    • 更好的服务器
    • 多台服务器

注意:在涉及多个套接字的任何场景中,您要小心不要发疯并有太多专用工作线程;如果该数字超过 10 个线程,您可能要考虑其他选项 - 可能涉及异步 IO 和/或管道(如下)。


为了完整起见,另一种基本方法是从应用程序代码编写;这种方法更简单,并且避免了未发送工作的积压,但是:这意味着现在您的应用程序代码线程它们自己将在负载下备份。如果您的应用程序代码线程实际上是工作线程,并且它们在同步/lock 上被阻塞,那么这可能真的很糟糕;您确实想要使线程池饱和,因为您最终可能会遇到没有线程池线程可用于满足 unblock 所需的 IO 工作的情况writer 处于活跃状态,这会使您陷入真正的 问题。这通常不是您想要用于高负载/容量的方案,因为它很快就会出现问题 - 并且很难避免数据包碎片,因为每个单独的消息都无法知道是否会有更多消息进入.


最近要考虑的另一个选项是“管道”;这是 .NET 中的一个新 IO 框架,专为大容量网络而设计,特别关注异步 IO、缓冲区重用以及实现良好的缓冲区/积压日志机制,使得使用简单的writer 方法(在写入时同步)并且不会将其转换为直接发送 - 它表现为可以访问 backlog 的异步写入器,这使得避免数据包碎片化变得简单而高效。这是一个相当先进的领域,但它可能非常有效。对您来说有问题的部分是:它是为整个异步使用而设计的,甚至用于写入 - 因此,如果您的应用程序代码当前是同步的,那么实现起来可能会很痛苦。但是:这是一个需要考虑的领域。我有很多关于这个主题的博客文章,以及一系列使用管道的 OSS 示例和实际库,我可以指出这些,但是:这不是“快速修复”——它是彻底改革整个 IO 层。它也不是灵丹妙药 - 它只能消除由于本地 IO 处理成本而产生的开销。

【讨论】:

  • 首先,非常感谢您的详细解释。字符串消息的大小可能约为 100 字节 - 最大为 2500 字节,在最坏的情况下,它可能达到 3000 字节,但我更喜欢将缓冲区大小保持在 2048 字节左右,我认为这是最佳的。不,到目前为止发送的每条消息没有超过 2048 范围。使用队列的好建议,我在写这篇文章时正在更改实现。
  • 第二,客户端和服务器是同一台机器,从不同的端口监听和发送,因此不存在网络问题。硬件,我相信没有问题(i7-8th Gen 和 16gb ram) 在应用程序部分进行编码,我相信它会减慢应用程序本身的速度,因为它必须快速完成所需的操作,所以我肯定有使用编写器进行编码...
  • @kowsikbabu 谈“闪电般的速度”......我想这很公平,但我认为你的机器可能有很多内核,如果应用程序代码进行编码,你就是很好地传播这项工作;如果 writer 这样做,则所有编码都在一个核心上;但是 - 这可能没问题。您是否尝试过数组池编码来减少缓冲区分配?这可能很有帮助。实际上,如果您使用 ASCII,则使用 SIMD 进行编码有一些技巧,因为 UTF16 到 ASCII 可以被认为是一种“狭义”操作 - 但也许这应该被搁置,直到您拥有......
  • @kowsikbabu ...更好的测量,坦率地说 ASCII 编码已经非常快了,可能不是你的瓶颈(另外:内置的 API 可能已经在幕后使用 SIMD/narrow!)
  • @kowsikbabu 哦,还有一个;如果您正在使用本地套接字通信,并且您可能在 Windows 上:您是否启用了“快速环回”套接字选项?见:github.com/mgravell/Pipelines.Sockets.Unofficial/blob/…
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-09
  • 1970-01-01
  • 2014-03-09
相关资源
最近更新 更多