【问题标题】:TCP Server workers with kqueue带有 kqueue 的 TCP 服务器工作者
【发布时间】:2014-08-10 13:13:37
【问题描述】:

我最近对内核事件进行了一些测试,结果如下:

  • 使用内核事件来接受套接字是否有意义?我的测试表明我一次只能处理一个接受(即使事件列表数组更大)(对我来说很有意义,因为 .ident == sockfd 仅适用于一个套接字)。

  • 我以为kevent的使用主要是一次从多个socket中读取。这是真的吗?

这就是 TCP 服务器使用 kqueue 实现的方式吗? :


  • 监听线程(无 kqueue)
    • 接受新连接并将 FD 添加到工作人员 kqueue。 问题:这可能吗?我的测试表明是的,但它是否保证工作线程会知道这些更改并且 kevent 真的是线程安全的?

  • 工作线程(带 kqueue)

    • 等待读取从侦听线程添加的文件描述符。

    问题:一次检查多少个套接字才有意义?


谢谢

【问题讨论】:

    标签: c sockets tcp kqueue kevent


    【解决方案1】:

    通常,您使用 kqueue 作为线程的替代方法。如果你要使用线程,你可以设置一个监听线程和一个工作线程池,每个接受的连接一个线程。这是一个更简单的编程模型。

    在事件驱动的框架中,您会将监听套接字和所有接受的套接字都放入 kqueue,然后在事件发生时处理它们。当你接受一个套接字时,你将它添加到 kqueue 中,当套接字处理程序完成它的工作时,它可以从 kqueue 中删除套接字。 (后者通常不是必需的,因为关闭 fd 会自动从任何 kqueue 中删除任何关联的事件。)

    请注意,使用 kqueue 注册的每个事件都有一个 void* 用户数据,可用于在事件触发时识别所需的操作。所以没有必要每个事件队列都有一个唯一的事件处理程序;事实上,拥有各种处理程序是很常见的。 (例如,您可能想要处理通过命名管道设置的控制通道。)

    混合事件/线程模型当然是可能的;否则,您将无法利用多核 CPU。一种可能的策略是将事件队列用作生产者-消费者模型中的调度程序。队列处理程序将直接处理侦听套接字上的事件,接受连接并将接受的 fd 添加到事件队列中。当客户端连接事件发生时,该事件将被发布到工作队列中以供以后处理。也可以有多个工作队列,每个线程一个,并让接受者猜测新连接应该放在哪个工作队列中,大概是根据该线程的当前负载。

    【讨论】:

    • 首先感谢您的解释。我明白,但我的测试表明服务器不能一次接受多个连接:(,而且我很不确定在同一个线程中读、写和听是否有意义。例如,在客户端读取我需要解析数据包,然后做一些mysql查询然后创建一个响应,此时没有客户端可以连接到我的服务器......所以真的是这样吗?
    • @d3l:那你做错了 :) 也许你需要用一个最小的可编译示例来问一个不同的问题。
    • 我做了一个小例子来解释这个问题。问候
    • @rici 我喜欢这个问题,我自己刚才也问了一个类似的问题。但是我认为您的答案解决的问题是 - 多个线程可以向 kqueue 添加文件描述符(kqueues 线程是否安全)?如果是这样,人们将如何去做呢?另外,如果 OP 能够找到一种方法来做到这一点,他/她可以与我分享吗?
    • @rici 对上述评论的澄清。我不想用 kqueue 模块和 TCP 逻辑创建循环依赖。我想以一种让它们都是独立实体的方式来做到这一点。如果我使用你所说的关于将文件描述符添加到 kqueue 的 accept 调用,那么该逻辑将必须添加到 kqueue 模块
    【解决方案2】:

    这不是一个真正的答案,但我用kqueue 制作了一个小服务器脚本来解释问题:

    #include <stdio.h>          // fprintf
    #include <sys/event.h>      // kqueue
    #include <netdb.h>          // addrinfo
    #include <arpa/inet.h>      // AF_INET
    #include <sys/socket.h>     // socket
    #include <assert.h>         // assert
    #include <string.h>         // bzero
    #include <stdbool.h>        // bool
    #include <unistd.h>         // close
    
    int main(int argc, const char * argv[])
    {
    
        /* Initialize server socket */
        struct addrinfo hints, *res;
        int sockfd;
    
        bzero(&hints, sizeof(hints));
        hints.ai_family     = AF_INET;
        hints.ai_socktype   = SOCK_STREAM;
    
        assert(getaddrinfo("localhost", "9090", &hints, &res) == 0);
    
        sockfd = socket(AF_INET, SOCK_STREAM, res->ai_protocol);
    
        assert(sockfd > 0);
    
        {
            unsigned opt = 1;
    
            assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0);
    
            #ifdef SO_REUSEPORT
            assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == 0);
            #endif
        }
    
        assert(bind(sockfd, res->ai_addr, res->ai_addrlen) == 0);
    
        freeaddrinfo(res);
    
        /* Start to listen */
        (void)listen(sockfd, 5);
    
        {
            /* kevent set */
            struct kevent kevSet;
            /* events */
            struct kevent events[20];
            /* nevents */
            unsigned nevents;
            /* kq */
            int kq;
            /* buffer */
            char buf[20];
            /* length */
            ssize_t readlen;
    
            kevSet.data     = 5;    // backlog is set to 5
            kevSet.fflags   = 0;
            kevSet.filter   = EVFILT_READ;
            kevSet.flags    = EV_ADD;
            kevSet.ident    = sockfd;
            kevSet.udata    = NULL;
    
            assert((kq = kqueue()) > 0);
    
            /* Update kqueue */
            assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);
    
            /* Enter loop */
            while (true) {
                /* Wait for events to happen */
                nevents = kevent(kq, NULL, 0, events, 20, NULL);
    
                assert(nevents >= 0);
    
                fprintf(stderr, "Got %u events to handle...\n", nevents);
    
                for (unsigned i = 0; i < nevents; ++i) {
                    struct kevent event = events[i];
                    int clientfd        = (int)event.ident;
    
                    /* Handle disconnect */
                    if (event.flags & EV_EOF) {
    
                        /* Simply close socket */
                        close(clientfd);
    
                        fprintf(stderr, "A client has left the server...\n");
                    } else if (clientfd == sockfd) {
                        int nclientfd = accept(sockfd, NULL, NULL);
    
                        assert(nclientfd > 0);
    
                        /* Add to event list */
                        kevSet.data     = 0;
                        kevSet.fflags   = 0;
                        kevSet.filter   = EVFILT_READ;
                        kevSet.flags    = EV_ADD;
                        kevSet.ident    = nclientfd;
                        kevSet.udata    = NULL;
    
                        assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);
    
                        fprintf(stderr, "A new client connected to the server...\n");
    
                        (void)write(nclientfd, "Welcome to this server!\n", 24);
                    } else if (event.flags & EVFILT_READ) {
    
                        /* sleep for "processing" time */
                        readlen = read(clientfd, buf, sizeof(buf));
    
                        buf[readlen - 1] = 0;
    
                        fprintf(stderr, "bytes %zu are available to read... %s \n", (size_t)event.data, buf);
    
                        sleep(4);
                    } else {
                        fprintf(stderr, "unknown event: %8.8X\n", event.flags);
                    }
                }
            }
        }
    
        return 0;
    }
    

    每次客户端发送内容时,服务器都会经历 4 秒的“延迟”。 (我夸大了一点,但对于测试来说相当合理)。那么如何解决这个问题呢?我认为具有自己的kqueue 的工作线程(池)作为可能的解决方案,那么不会发生连接延迟。 (每个工作线程读取某个“范围”的文件描述符)

    【讨论】:

    • 你应该让你的套接字是非阻塞的。但主要问题是你不能在事件循环中睡觉。事件循环的全部意义在于事件处理程序永远不会阻塞。如果它需要等待某事发生,它将等待该事件到事件队列中。事件处理程序也不应该做计算成本高昂的事情;如果有这样的任务,则需要将它们移交给另一个线程。这就是为什么事件驱动编程比线程编程更难的原因。
    • 哦,这真的应该是一个单独的问题。不要使用答案发布问题;提问是免费的,您应该这样做,以使有类似问题的人受益。
    • @rici 那么如何解决这个问题呢?在我的第一个版本中,我只是为每个新连接创建了一个线程,但是当连接数量增加时,这非常糟糕。而且由于事件循环中的处理也很糟糕,我不知道该怎么做。你能给我一个服务器的示例概念吗? (为了清楚起见:我正在尝试使用 persistent 连接实现即时通讯服务器)
    • 我不会再回答 cmets 的任何问题了。您可能需要使用您最喜欢的搜索引擎来查找 kqueue 教程。以防万一,这太具有挑战性了,这是我在几秒钟内用 google 找到的一个示例 kqueue 程序:markmail.org/thread/rvkieyevo5u2wbtz
    • 我已经这样做了,但没有发现像这个有用的东西。这些总是带来不好的结果。例如,在打开的连接过多时(释放缓冲区时),此服务器总是给我一个 BAD ACCESS。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-14
    • 2013-08-18
    • 2018-02-10
    • 1970-01-01
    • 2018-05-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多