【问题标题】:epoll IO with worker threads in C在 C 中使用工作线程的 epoll IO
【发布时间】:2014-03-20 12:06:03
【问题描述】:

我正在编写一个小型服务器,它将接收来自多个来源的数据并处理这些数据。收到的来源和数据很重要,但不超过epoll应该能够很好地处理。但是,所有接收到的数据都必须进行解析并通过大量测试运行,这既耗时又会阻塞单个线程,尽管 epoll 多路复用。基本上,该模式应该如下所示:IO-loop 接收数据并将其捆绑到作业中,发送到池中的第一个可用线程,捆绑由作业处理并将结果传递到 IO 循环中写入文件。

我决定使用单个 IO 线程和 N 个工作线程。用于接受 tcp 连接和读取数据的 IO 线程使用以下提供的示例很容易实现: http://linux.die.net/man/7/epoll

线程通常也很容易处理,但我正在努力以一种优雅的方式将 epoll IO 循环与线程池结合起来。我也找不到任何将 epoll 与在线工作池一起使用的“最佳实践”,但是关于同一主题的问题很多。

因此我有一些问题希望有人能帮助我回答:

  1. 是否可以(并且应该)使用 eventfd 作为 IO 线程和所有工作线程之间双向同步的机制?例如,每个工作线程都有自己的 epoll 例程等待共享 eventfd(带有结构指针,包含有关作业的数据/信息),即以某种方式使用 eventfd 作为作业队列,这是一个好主意吗?或许还有另一个 eventfd 将结果从多个工作线程传回 IO 线程?
  2. 在 IO 线程收到有关套接字上更多数据的信号后,是否应该在 IO 线程上进行实际的接收,或者工作人员是否应该自己接收数据,以便在解析数据帧时不阻塞 IO 线程等.?在那种情况下,我怎样才能确保安全,例如如果 recv 在一个工作线程中读取 1.5 帧数据,而另一个工作线程从同一连接接收最后 0.5 帧数据?
  3. 如果工作线程池是通过互斥锁等实现的,如果N+1个线程尝试使用同一个锁,等待锁会阻塞IO线程吗?
  4. 对于如何通过双向通信(即从 IO 到 worker 和返回)围绕 epoll 构建工作线程池,是否有任何好的实践模式?

编辑:一个可能的解决方案是从 IO 循环更新环形缓冲区,更新后通过所有工作人员的共享管道将环形缓冲区索引发送给工作人员(从而将该索引的控制权交给第一个工作人员从管道中读取索引),让工作人员拥有该索引直到处理结束,然后再次通过管道将索引号发送回 IO 线程,从而交还控制权?

我的应用程序仅适用于 Linux,因此我可以使用仅适用于 Linux 的功能,以便以最优雅的方式实现这一目标。不需要跨平台支持,但需要性能和线程安全。

【问题讨论】:

  • 我想我可能有一个有用的解决方案,但需要知道,你多久知道单个帧/数据包的长度?它们是固定长度,是包含在数据包头中还是您只知道最后?如果你早点知道,那么在不忙于主线程的情况下完成工作要容易得多,但如果你不知道到最后,主线程不可避免地需要做大量的阅读。
  • 嗨,我知道 recv 之后和遍历 recv 缓冲区之后的长度。不幸的是,它们不是固定长度,并且长度不会出现在数据包中,而是基于换行帧。

标签: c linux multithreading posix epoll


【解决方案1】:

我在其他帖子中发布了相同的答案:I want to wait on both a file descriptor and a mutex, what's the recommended way to do this?

================================================ ===========

这是一个非常常见的问题,尤其是在您开发网络服务器端程序时。大多数 Linux 服务器端程序的主外观会像这样循环:

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        resp = proc(req);
        fd.send(resp);
    }
}

它是单线程(主线程),基于 epoll 的服务器框架。问题是,它是单线程的,而不是多线程的。它要求 proc() 永远不能阻塞或运行很长一段时间(比如常见情况下为 10 毫秒)。

如果 proc() 会运行很长时间,我们需要多线程,并在单独的线程(工作线程)中执行 proc()。

我们可以在不阻塞主线程的情况下向工作线程提交任务,使用基于互斥体的消息队列,速度足够快。

那么我们需要一种从工作线程中获取任务结果的方法。如何?如果我们只是直接检查消息队列,在 epoll_wait() 之前或之后,然而,检​​查动作将在 epoll_wait() 结束后执行,并且 epoll_wait() 如果它等待所有文件描述符,通常会阻塞 10 微秒(常见情况)不活跃。

对于服务器来说,10 毫秒是相当长的时间!任务结果生成后,是否可以立即通知 epoll_wait() 结束?

是的!我将在我的一个开源项目中描述它是如何完成的。

为所有工作线程创建一个管道,并且 epoll 也在该管道上等待。一旦生成任务结果,工作线程将一个字节写入管道,然后 epoll_wait() 将几乎同时结束! - Linux 管道有 5 us 到 20 us 的延迟。

在我的项目SSDB(一个兼容 Redis 协议的磁盘内 NoSQL 数据库)中,我创建了一个 SelectableQueue 用于在主线程和工作线程之间传递消息。就像它的名字一样,SelectableQueue 有一个文件描述符,可以通过 epoll 等待。

可选队列:https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

在主线程中的使用:

epoll_add(serv_sock);
epoll_add(queue->fd());
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        if(fd is worker_thread){
            sock, resp = worker->pop_result();
            sock.send(resp);
        }
        if(fd is client_socket){
            req = fd.read();
            worker->add_task(fd, req);
        }
    }
}

在工作线程中的使用:

fd, req = queue->pop_task();
resp = proc(req);
queue->add_result(fd, resp);

【讨论】:

    【解决方案2】:

    在执行此模型时,因为我们只有在完全接收到数据包后才知道数据包大小,不幸的是我们无法将接收本身卸载到工作线程。相反,我们仍然可以做的最好的事情是一个线程来接收数据,它必须将指针传递给完全接收的数据包。

    数据本身可能最好保存在循环缓冲区中,但是我们希望每个输入源都有一个单独的缓冲区(如果我们得到部分数据包,我们可以继续从其他源接收而不拆分数据。剩下的问题是如何通知工作人员新数据包何时准备好,并给他们一个指向所述数据包中数据的指针。因为这里的数据很少,所以只有一些指针,最优雅的方法是使用 posix 消息队列。这些为多个发送者和多个接收者提供了写入和读取消息的能力,始终确保每条消息都被精确地接收到 1 个线程。

    对于每个数据源,您将需要一个类似于以下结构的结构,我现在将介绍字段用途。

    struct DataSource
    {
        int SourceFD;
        char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
        char *LatestPacket;
        char *CurrentLocation
        int SizeLeft;
    };
    

    SourceFD 显然是相关数据流的文件描述符,DataBuffer 是处理数据包内容的地方,它是一个循环缓冲区。 LatestPacket 指针用于临时保存指向最近发送的数据包的指针,以防我们收到部分数据包并在传递数据包之前移动到另一个源。 CurrentLocation 存储最新数据包的结束位置,以便我们知道在哪里放置下一个数据包,或者在部分接收的情况下在哪里继续。剩余的大小是缓冲区中剩余的空间,这将用于判断我们是否可以容纳数据包或需要绕回起点。

    接收函数将因此有效

    • 将数据包的内容复制到缓冲区中
    • 将 CurrentLocation 移动到数据包的末尾
    • 更新 SizeLeft 以解决现在减少的缓冲区
    • 如果我们无法将数据包放入缓冲区的末尾,我们会循环播放
    • 如果那里没有空间,我们稍后再试一次,同时转到另一个来源
    • 如果我们有部分接收存储 LatestPacket 指针以指向数据包的开头并转到另一个流直到我们得到其余部分
    • 使用posix message queue 向工作线程发送消息以便它可以处理数据,消息将包含指向DataSource 结构的指针以便它可以处理它,它还需要一个指向它正在工作的数据包的指针on,还有它的大小,这些可以在我们收到包的时候计算出来

    工作线程将使用接收到的指针进行处理,然后增加 SizeLeft 以便接收线程知道它可以继续填充缓冲区。将需要原子函数来处理结构中的大小值,因此我们不会使用 size 属性获得竞争条件(因为它可能由工作线程和 IO 线程同时写入,导致写入丢失,请参阅我的下面的评论),它们被列出 here 并且简单且非常有用。

    现在,我已经给出了一些一般性的背景知识,但将具体说明几点:

    1. 使用 EventFD 作为同步机制在很大程度上是一个坏主意,您会发现自己使用了大量不必要的 CPU 时间,并且很难执行任何同步。特别是如果您有多个线程获取相同的文件描述符,您可能会遇到重大问题。这实际上是一种令人讨厌的 hack,有时会起作用,但不能真正替代正确的同步。
    2. 如上所述尝试卸载接收也是一个坏主意,您可以解决复杂 IPC 的问题,但坦率地说,接收 IO 不太可能需要足够的时间来停止您的应用程序,您的 IO 也可能很多比 CPU 慢,因此使用多个线程接收将获得很少。 (这里假设你没有说,有几个 10 千兆网卡)。
    3. 在这里使用互斥锁或锁是一个愚蠢的想法,考虑到(同时)共享数据量很少,它更适合无锁编码,您实际上只是在交接工作和数据。这也将提高接收线程的性能并使您的应用程序更具可扩展性。使用这里提到的函数http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html,您可以轻松轻松地做到这一点。如果你确实这样做了,你需要的是一个信号量,每次接收到一个数据包并被每个线程锁定时都可以解锁它,如果有更多的数据包准备好,这将有与使用互斥锁的自制解决方案相比,开销要少得多。
    4. 这里与任何线程池都没有太大区别,您生成大量线程,然后将它们全部阻塞在数据消息队列上的 mq_receive 中以等待消息。完成后,他们将结果发送回主线程,主线程将结果消息队列添加到其 epoll 列表中。然后它可以通过这种方式接收结果,对于像指针这样的小数据有效负载来说,它既简单又非常有效。这也将使用很少的 CPU,并且不会强制主线程浪费时间管理工作线程。

    最后你的编辑是相当明智的,除了我建议的事实,消息队列比这里的管道好得多,因为它们非常有效地发出事件信号,保证完整的消息读取并提供自动框架。

    我希望这会有所帮助,但是为时已晚,所以如果我遗漏了任何内容或您有任何疑问,请随时发表评论以获得澄清或更多解释。

    【讨论】:

    • 感谢您的冗长回答。只是几个问题:我可以假设多个线程可能会阻塞在同一个队列上以等待新任务吗?多个线程也可以写入另一个队列以将完成的工作传回吗?在这样的设计中,我真的需要如上所述的内置函数吗?
    • @invictus 消息队列确实是多对多的关系,它们非常强大,因为任意数量的发送者和任意数量的接收者都可以使用队列,并且消息总是会准确地传递给其中一个监听者线程。绝大多数代码不需要上面的内置函数,唯一的用途是确保自动更新 SizeLeft 以确保接收线程和工作线程不会同时更新它并导致它损坏例如:thread1加载value,thread2加载value,thread1写入,thread2写入,thread1的写入丢失。
    • 听起来这是我一直在寻找的解决方案。你知道 mq_* 的表现如何(即性能开销)吗?
    • @invictus 虽然性能取决于您的特定应用程序和您拥有的确切内核版本,但您可以预期它们将提供比套接字等更高的性能,但比定制的共享内存队列稍慢系统,但是它们是完全线程安全的,将为您节省大量设计时间。更重要的是,它们变慢的唯一时间是非常大的消息(因为消息在内部存储了两次,一次在发送者的内存中,一次在接收者的内存中),所以如果你只传递指针并且小消息的开销可以忽略不计。
    • 另外,一个我忘了提的说明,posix消息队列保证在尝试发送时永远不会阻塞发送者,数据传输都是异步完成的。这意味着任何开销都不会减慢 IO 线程。
    【解决方案3】:

    在我的测试中,到目前为止,每个线程一个 epoll 实例的性能优于复杂的线程模型。如果将侦听器套接字添加到所有 epoll 实例,则工作人员只需 accept(2),获胜者将获得连接并在其生命周期内处理它。

    您的工人可能看起来像这样:

    for (;;) {
        nfds = epoll_wait(worker->efd, &evs, 1024, -1);
    
        for (i = 0; i < nfds; i++)
            ((struct socket_context*)evs[i].data.ptr)->handler(
                evs[i].data.ptr,
                evs[i].events);
    }
    

    添加到 epoll 实例的每个文件描述符都可以有一个 struct socket_context 与之关联:

    void listener_handler(struct socket_context* ctx, int ev)
    {
        struct socket_context* conn;
    
        conn->fd = accept(ctx->fd, NULL, NULL);
        conn->handler = conn_handler;
    
        /* add to calling worker's epoll instance or implement some form
         * of load balancing */
    }
    
    void conn_handler(struct socket_context* ctx, int ev)
    {
        /* read all available data and process. if incomplete, stash
         * data in ctx and continue next time handler is called */
    }
    
    void dummy_handler(struct socket_context* ctx, int ev)
    {
        /* handle exit condition async by adding a pipe with its
         * own handler */
    }
    

    我喜欢这个策略,因为:

    • 非常简单的设计;
    • 所有线程都是相同的;
    • worker 和连接是隔离的——不要踩对方的脚趾或在错误的 worker 中调用 read(2)
    • 不需要锁(内核开始担心accept(2) 上的同步);
    • 有点自然的负载平衡,因为没有忙碌的工作人员会积极参与accept(2)

    还有一些关于epoll的注意事项:

    • 使用边缘触发模式、非阻塞套接字并始终读取到EAGAIN
    • 避免dup(2) 家族的调用以避免一些意外(epoll 注册文件描述符,但实际上监视文件描述);
    • 你可以安全地epoll_ctl(2)其他线程的epoll实例;
    • epoll_wait(2) 使用较大的struct epoll_event 缓冲区以避免饥饿。

    其他一些注意事项:

    • 使用accept4(2)保存系统调用;
    • 每个内核使用一个线程(如果 CPU 受限,每个物理线程 1 个,如果 I/O 受限,每个逻辑线程 1);
    • 如果连接数较少,poll(2)/select(2) 可能会更快。

    我希望这会有所帮助。

    【讨论】:

    • 我喜欢这个想法,但是,我担心每次recv后我繁重的工作量会阻塞其他连接。另外,如果一个线程“幸运”到可以先选择下一个接受,这不会导致每个线程的工作负载不平衡吗?此外,如果我只有 4-5 个连接,我可能仍然希望 30 个工作线程来处理它们产生的内容。
    • @invictus 是的,除非您自己在侦听器处理程序中平均分配连接,否则工作负载不会完全平衡,这可能会增加一些复杂性。您的工作是否受 CPU 或 I/O 限制?如果受 CPU 限制,多于处理器内核的线程只会引入更多的上下文切换。
    • @pindumb 对于低线程数(每个物理内核一个)不会有问题。在负载下,可能只有一小部分线程会见证可读的侦听器。如果这是一个问题,侦听器可以在线程之间轮换。如果有成百上千个线程,情况就不同了。
    • 我相信,虽然是一个有趣的想法,但这种特殊的设计并不适合,因为我相信单个线程应该能够处理所有 IO,而实际处理是高度 CPU 绑定的。我将在 14 小时内添加赏金,因为多年来我一直在寻找一个好的答案,而我所发现的只是其他人也有同样的疑惑。获得对其他人可用的“教科书答案”会很高兴:)
    • @invictus 太酷了。我一直在做类似的事情,但工作受 I/O 限制。我尝试了 1 个侦听器线程 + N 个带有单个和单独的无锁 FIFO 工作队列的工作人员。扼杀性能的是缓存线弹跳和弄清楚何时/如何睡觉和唤醒工作人员。隔离确实是 10-100 倍加速的关键。但是,如果处理时间很长,在您的情况下共享可写数据可能可以忽略不计。祝你好运。 :)
    猜你喜欢
    • 2013-03-15
    • 1970-01-01
    • 2014-08-03
    • 2017-03-18
    • 1970-01-01
    • 2016-06-27
    • 1970-01-01
    • 2020-12-09
    • 2013-01-13
    相关资源
    最近更新 更多