【问题标题】:Sockets: How to send data to the client without 'waiting' on them as they receive/parse it套接字:如何在接收/解析数据时向客户端发送数据而不“等待”它们
【发布时间】:2009-07-15 23:34:32
【问题描述】:

我有一个套接字服务器,使用 boost::asio 用 C++ 编写,我正在向客户端发送数据。

服务器以块的形式发送数据,客户端在接收到每个块时对其进行解析。两者现在几乎都是单线程的。

我应该在服务器上使用什么设计来确保服务器尽可能快地写出数据,而不是等待客户端解析它?我想我需要在服务器上做一些异步操作。

我想也可以在客户端上进行更改以实现此目的,但理想情况下,无论客户端是如何编写的,服务器都不应该等待客户端。

我正在像这样向套接字写入数据:

size_t bytesWritten = m_Socket.Write( boost::asio::buffer(buffer, bufferSize));

更新:

我将尝试使用 Boost 的机制来异步写入套接字。见http://www.boost.org/doc/libs/1_36_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html

例如

 boost::asio::async_write(socket_, boost::asio::buffer(message_),
        boost::bind(&tcp_connection::handle_write, shared_from_this(),
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
  • 亚历克斯

【问题讨论】:

    标签: c++ sockets asynchronous boost-asio


    【解决方案1】:

    如果您将套接字设置为非阻塞,那么如果它们会阻塞,写入应该会失败。然后,您可以根据自己的喜好对数据进行排队,并安排稍后再次尝试写入数据。我不知道如何在 boost socket API 中设置套接字选项,但这正是您要找的。​​p>

    但这可能比它的价值更麻烦。您需要选择一个准备好写入的套接字,大概从同时打开的几个套接字中,将更多数据推入其中直到它已满,然后重复。我不知道 boost sockets API 是否有 select 的等价物,这样你就可以一次等待多个套接字,直到它们中的任何一个准备好写入。

    服务器通常为每个客户端连接启动一个线程(或生成一个进程)的原因正是为了在等待 I/O 的同时继续为其他客户端提供服务,同时避免实现自己的队列。 “安排稍后再尝试”的最简单方法就是在专用线程中阻塞 I/O。

    除非 boost 在其套接字 API 中做了一些不寻常的事情,否则你不能做的是要求操作系统或套接字库为你排队任意数量的数据而不会阻塞。可能有一个异步 API 会在数据写入时回调您。

    【讨论】:

    • 感谢您提供的信息,这听起来像是我正在寻找的。​​span>
    • 啊,我看到你确实找到了一个异步 API。从 BSD 风格的 API 的角度来看,这通常是在进程中(而不是在 TCP 堆栈中)将写入操作排队,直到可以写入为止。如果幸运的话,您可以同时在同一个套接字上进行任意多次写入,这已经非常好。通常你只会被允许一个。
    【解决方案2】:

    您可以通过不通过 TCP 而是通过 UDP 传输数据来确保异步通信。 但是,如果您需要使用 TCP,请让客户端快速将数据存储起来,然后在不同的线程中或与 cron 作业异步处理。

    【讨论】:

    • 服务器没有办法只抽出数据并将其排队到某个地方吗?例如。在服务器 TCP 堆栈中排队,还是在客户端 TCP 堆栈中排队?
    • 当然有......但没有操作系统(至少我在 Windows 中不知道)实用程序可以为您执行此操作,因此您必须实现它。这基本上就是stefanw的意思,客户端快速存储数据就相当于你的客户端在排队。
    • @DeusAduro:知道了,但如果可能的话,我想在 server 上这样做。我可以异步写入服务器上的套接字吗?还是我需要在服务器上启动一个新线程来写出数据?
    • @Alex 看看下面我的回答。
    • 让客户端尽快转移数据是个好主意,但还不够。延迟不一定与客户端有关-可能是介于两者之间的某个路由器是瓶颈,或者(等效地)客户端的带宽无法像服务器生成数据那样快地处理数据。然后客户端无能为力,但您仍然不希望服务器阻塞(除非每个套接字都有一个专用线程,但 OP 显然希望避免这种情况)
    【解决方案3】:

    当您将数据传递给套接字时,它不会等待接收者处理它。它甚至不等待数据传输。数据被放入由操作系统在后台处理的出站队列。写入函数返回排队等待传输的字节数,而不是实际传输的字节数。

    【讨论】:

    • 操作系统在排队之前是否首先检查连接性?因为我们的读/写函数告诉我们客户端/服务器是否已断开连接。如果要直接进入队列并返回这些函数永远不会给我们这些信息?
    • 嗯,这似乎不是我所看到的。如果我注释掉我的客户端解析代码(所以它只是盲目地读取所有数据),那么我的服务器请求完成得更快。
    • @Remy:我上面说过,你说的不是我看到的,但是,那是我想看到的。
    • Remy 所说的是真实的,直到客户端和服务器之间的所有缓冲区都已满。最终,服务器的出站缓冲区也已满,然后在 BSD 样式的套接字 API 中,写入将阻塞或失败 EAGAIN,具体取决于套接字是阻塞还是非阻塞。
    • @onebyone:很有趣,谢谢。我将不得不更仔细地查看我的服务器和客户端之间发生的事情。
    【解决方案4】:

    继续 Stefan 帖子中的 cmets:

    绝对可以在客户端或服务器端进行缓冲。但请务必考虑尼尔写的内容。如果我们只是开始盲目地缓冲数据,并且如果处理永远跟不上发送,那么我们的缓冲区将以我们可能不想要的方式增长。

    现在我最近实现了一个简单的“NetworkPipe”,它旨在充当单个客户端/服务器、服务器/客户端之间的连接,外部用户不知道/关心管道是客户端还是服务器。我实现了类似于您所询问的缓冲情况,如何?好吧,这个类是线程化的,这是我能想出干净地缓冲数据的唯一方法。这是我遵循的基本过程,请注意我在管道上设置了最大尺寸:

    1. 进程 1 启动管道,默认为服务器。现在内部线程等待客户端。
    2. 进程 2 启动管道,已经是服务器,默认为客户端。
    3. 我们现在已连接,首先要做的是交换最大缓冲区大小。
    4. 进程 1 写入数据(它注意到另一端有一个空缓冲区 [参见 #3])
    5. 进程 2 的内部线程(现在正在等待套接字的 select())看到数据已发送并读取它,并对其进行缓冲。进程 2 现在将新的缓冲大小发送回 P1。

    所以这是一个非常简化的版本,但基本上通过线程化,我总是可以等待阻塞选择调用,一旦数据到达,我可以读取并缓冲它,我会发回新的缓冲大小。你可以做类似的事情,盲目地缓冲数据,它实际上相当简单,因为你不必交换缓冲区大小,但这可能是个坏主意。所以上面的例子允许外部用户在不阻塞线程的情况下读取/写入数据(除非另一端的缓冲区已满)。

    【讨论】:

    • 在 30 分钟内,这个问题变得如此复杂,我无法理解。在我看来,您仍然在 TCP 之上重新发明了 TCP 的一部分。除非我遗漏了什么,否则您所做的正是 TCP 对其缓冲区和窗口大小所做的事情。
    • 也许我有,你能否指导我看一篇关于如何完成缓冲的文章。我真正感兴趣的是操作系统将为我缓冲多少数据?我上面的方法允许我说“我想缓冲最多 1000 个字节,但不能再缓冲了。”,有没有办法从内置缓冲中获得这种明确定义的行为?
    • 每个 tcp 套接字在内核中都有一个发送和接收缓冲区,可以通过带有 SO_RCVBUF 和 SO_SNDBUF 选项的 getsockopt()/setsockopt() 进行管理。它们有多空是 tcp 告诉它的对等方它可以接受的(即窗口大小)。不知道有什么好文章,但是搜索上面和/或 tcp 缓冲区、流控制和窗口大小应该会发现一些东西。没有什么可以替代好的 tcp 或套接字书籍(例如 Stevens)。
    • 至于定义有多好,可能不是但你不需要。只需让 tcp 管理其缓冲区,您的应用程序只需读取它认为可以一次性管理的字节数。如果它的缓冲区无法管理,TCP 会告诉对端放慢速度。
    【解决方案5】:

    我使用 boost::asio::async_write 方法实现了一个解决方案。

    基本上:

    • 每个客户端都有一个线程(我的线程正在执行 CPU 密集型工作)
    • 随着每个线程积累一定量的数据,它使用 async_write 将其写入套接字,而不关心之前的写入是否已完成
    • 代码小心管理套接字的生命周期和正在写出的数据缓冲区,因为 CPU 处理在所有数据写出之前完成

    这对我很有效。这使服务器线程能够在其 CPU 工作完成后立即完成。

    总体而言,客户端接收和解析其所有数据的时间减少了。同样,服务器花费在每个客户端上的时间(墙上的时钟)也会减少。

    代码sn-p:

    void SocketStream::Write(const char* data, unsigned int dataLength)
    {
        // Make a copy of the data
        // we'll delete it when we get called back via HandleWrite
        char* dataCopy = new char[dataLength];
        memcpy( dataCopy,  data, dataLength );
    
        boost::asio::async_write
            (
            *m_pSocket,
            boost::asio::buffer(dataCopy, dataLength),
            boost::bind
                (
                &SocketStream::HandleWrite,                     // the address of the method to callback when the write is done
                shared_from_this(),                             // a pointer to this, using shared_from_this to keep us alive
                dataCopy,                                       // first parameter to the HandleWrite method
                boost::asio::placeholders::error,               // placeholder so that async_write can pass us values
                boost::asio::placeholders::bytes_transferred
                )
            );
    }
    
    void SocketStream::HandleWrite(const char* data, const boost::system::error_code& error, size_t bytes_transferred)
    {
        // Deallocate the buffer now that its been written out
        delete data;
    
        if ( !error )
        {
            m_BytesWritten += bytes_transferred;
        }
        else
        {
            cout << "SocketStream::HandleWrite received error: " << error.message().c_str() << endl;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-05-02
      • 2021-06-25
      • 2016-12-16
      • 2013-09-22
      • 1970-01-01
      • 1970-01-01
      • 2020-03-06
      • 1970-01-01
      相关资源
      最近更新 更多