【问题标题】:Does the hiredis Redis library create its own thread for async callbackshiredis Redis 库是否为异步回调创建自己的线程
【发布时间】:2023-03-12 00:05:02
【问题描述】:

我在多线程环境中使用 Redis,并且对它的运行方式有疑问。我在我的 c++ 应用程序中使用了hiredis c 库。

我的问题是:如果我在触发回调时使用异步模式,回调是否会在 Redis 客户端创建的另一个线程中处理?就像创建调用的线程不会受到回调处理的影响?谢谢!

【问题讨论】:

    标签: c++ c database redis


    【解决方案1】:

    Redis 客户端不会创建任何额外的 clent 线程,而是在现有线程中工作。

    Redis 在另一个(主)进程中工作。您使用的 redis API 在本地进程中工作,并使用进程间通信与主进程。异步请求意味着您的进程或线程将任务交给另一个,然后可以执行任何其他您的任务或等待事件。一段时间后,异步回复到达您的应用程序并可供使用。您的应用程序必须使用https://en.wikipedia.org/wiki/Event_loop 或任何通过调用回调来通知您的异步管理系统来处理事件(在本例中为 redis 应答)。

    异步架构意味着您运行事件循环,为每个事件调用回调处理程序。调用回调时,您可以创建多个异步任务。一旦创建了任务,它就会在任务完成或发生错误时调用回调事件处理程序。回调可能会分配给启动应用程序或出现新的网络连接。调用回调时可以创建redis任务,稍后会调用result事件回调。当前线程中的所有内容。我 - 你有多个线程,可以合理地期望每个线程都有一个事件循环。

    Redis 的单线程性质:http://redis.io/topics/latency#single-threaded-nature-of-redis

    客户端套接字处于非阻塞状态,因为 Redis 使用多路复用和非阻塞 I/O。 http://redis.io/topics/clients这意味着您的客户端永远不会被阻止。

    从 Redis 2.4 开始,Redis 中的线程仅用于在后台执行一些慢速 I/O 操作,主要与磁盘 I/O 相关,但这并没有改变 Redis 使用单个线程服务所有请求的事实.

    【讨论】:

    • 我了解 libuv 样式事件库的工作原理。那不是我的问题。我的问题是关于 redis 客户端库的内部工作动态,以及处理回调的位置。必须涉及另一个线程,否则它将被阻塞,除非数据库操作仅在线程由于某种原因阻塞时发生。在这种情况下,while(true){} 将导致异步操作永远不会发生。我的问题是哪个线程处理回调。
    • Server (main) threan 为任务服务并向您的线程发送回复。您的线程没有被阻塞,因为 nonblocked socked 用于传输。套接字传输完成后,您可以处理线程中需要的数据。你的意思是阻止你的线程的原因是什么?
    • 即使使用非阻塞套接字,也必须由线程检查套接字状态。如果主线程正在执行计算,那么即使套接字是非阻塞的,它也无法处理套接字。此外,该库不需要您调用函数来控制线程的调度,例如 libuv(一个事件库)。必须在某个地方涉及另一个线程。
    • 不要担心。这是操作系统任务。这是不同的,并且依赖于实现。更多信息请参考:en.wikipedia.org/wiki/Asynchronous_I/Oen.wikipedia.org/wiki/Epollen.wikipedia.org/wiki/Kqueue
    • 我写了一个epoll服务器,了解它是如何工作的。 EPoll 产生一个事件列表。必须检查此事件列表以了解数据何时到达。因为这个检查不是由主线程完成的(没有像libuv那样调用函数来启动它),所以另一个线程必须进行检查。没有其他办法。 EPoll 仍然需要一个线程来不断检查它。这只是意味着操作系统生成事件而不是 1000 个线程,每个线程检查一个套接字。
    【解决方案2】:

    当您调用 redisAsyncHandlerRead() 时,Hiredis 会调用回调。因此,回调会在您用来调用 redisAsyncHandlerRead() 的任何线程上调用。

    我相信一个最小的 Redis 异步示例,在 Linux-ish C 中,看起来像这样(为了清楚起见,删除了错误检查):

    #include "async.h"
    #include <unistd.h>
    #include <stdio.h>
    
    void myRedisCallback(redisAsyncContext *c, void *typeless_reply, void *priv_data) {
        redisReply *r = (redisReply *)typeless_reply;
        if (r->type == REDIS_REPLY_STRING)
            printf("foo is %s\n", r->str);
    }
    
    int main() {
        redisAsyncContext *c = redisAsyncConnect("localhost", 6379);
        redisAsyncCommand(c, myRedisCallback, NULL, "GET foo");
    
        for (int i = 0; i < 100; i++) {
            redisAsyncHandleWrite(c); // This sends the command.
            redisAsyncHandleRead(c); // This calls the callback if the reply has been received.
            usleep(10000); // A real app would do something here.
        }
    
        return 0;
    }
    

    【讨论】:

      【解决方案3】:

      redisAsyncHandleRead() 或 redisAsyncHandleWrite() 将调用您之前注册的任何回调。每次看到它都不会“开火”。文档和 async.h 文件都没有明确说明如何获取文件描述符 (ac->c.fd) 以实现例如 epoll 事件循环:

        #define MAX_EVENTS 10
      
        int epfd;
        if((epfd = epoll_create1(0)) == -1) {
           // handle error
        };
      
        redisAsyncContext *ac = redisAsyncConnect("127.0.0.1", 6379);
      
        if(epoll_ctl(epfd, EPOLL_CTL_ADD, ac->c.fd, &(struct epoll_event) { .events = EPOLLIN | EPOLLOUT | EPOLLET }) == -1) {
           // handle error
        }
      
        int nfds;
        struct epoll_event events[MAX_EVENTS];
        for(;;) {
           if((nfds = epoll_wait(epfd, events, MAX_EVENTS, -1) == -1) {
             // handle error
           }
           for(int i = 0; i < nfds; i++) {
               if(events[i].events & EPOLLIN) redisAsyncHandleRead(ac);
               if(events[i].events & EPOLLOUT) redisAsyncHandleWrite(ac);
           }
        }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2010-10-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-08
        • 1970-01-01
        相关资源
        最近更新 更多