【发布时间】:2023-03-12 00:05:02
【问题描述】:
我在多线程环境中使用 Redis,并且对它的运行方式有疑问。我在我的 c++ 应用程序中使用了hiredis c 库。
我的问题是:如果我在触发回调时使用异步模式,回调是否会在 Redis 客户端创建的另一个线程中处理?就像创建调用的线程不会受到回调处理的影响?谢谢!
【问题讨论】:
我在多线程环境中使用 Redis,并且对它的运行方式有疑问。我在我的 c++ 应用程序中使用了hiredis c 库。
我的问题是:如果我在触发回调时使用异步模式,回调是否会在 Redis 客户端创建的另一个线程中处理?就像创建调用的线程不会受到回调处理的影响?谢谢!
【问题讨论】:
Redis 客户端不会创建任何额外的 clent 线程,而是在现有线程中工作。
Redis 在另一个(主)进程中工作。您使用的 redis API 在本地进程中工作,并使用进程间通信与主进程。异步请求意味着您的进程或线程将任务交给另一个,然后可以执行任何其他您的任务或等待事件。一段时间后,异步回复到达您的应用程序并可供使用。您的应用程序必须使用https://en.wikipedia.org/wiki/Event_loop 或任何通过调用回调来通知您的异步管理系统来处理事件(在本例中为 redis 应答)。
异步架构意味着您运行事件循环,为每个事件调用回调处理程序。调用回调时,您可以创建多个异步任务。一旦创建了任务,它就会在任务完成或发生错误时调用回调事件处理程序。回调可能会分配给启动应用程序或出现新的网络连接。调用回调时可以创建redis任务,稍后会调用result事件回调。当前线程中的所有内容。我 - 你有多个线程,可以合理地期望每个线程都有一个事件循环。
Redis 的单线程性质:http://redis.io/topics/latency#single-threaded-nature-of-redis
客户端套接字处于非阻塞状态,因为 Redis 使用多路复用和非阻塞 I/O。 http://redis.io/topics/clients这意味着您的客户端永远不会被阻止。
从 Redis 2.4 开始,Redis 中的线程仅用于在后台执行一些慢速 I/O 操作,主要与磁盘 I/O 相关,但这并没有改变 Redis 使用单个线程服务所有请求的事实.
【讨论】:
当您调用 redisAsyncHandlerRead() 时,Hiredis 会调用回调。因此,回调会在您用来调用 redisAsyncHandlerRead() 的任何线程上调用。
我相信一个最小的 Redis 异步示例,在 Linux-ish C 中,看起来像这样(为了清楚起见,删除了错误检查):
#include "async.h"
#include <unistd.h>
#include <stdio.h>
void myRedisCallback(redisAsyncContext *c, void *typeless_reply, void *priv_data) {
redisReply *r = (redisReply *)typeless_reply;
if (r->type == REDIS_REPLY_STRING)
printf("foo is %s\n", r->str);
}
int main() {
redisAsyncContext *c = redisAsyncConnect("localhost", 6379);
redisAsyncCommand(c, myRedisCallback, NULL, "GET foo");
for (int i = 0; i < 100; i++) {
redisAsyncHandleWrite(c); // This sends the command.
redisAsyncHandleRead(c); // This calls the callback if the reply has been received.
usleep(10000); // A real app would do something here.
}
return 0;
}
【讨论】:
redisAsyncHandleRead() 或 redisAsyncHandleWrite() 将调用您之前注册的任何回调。每次看到它都不会“开火”。文档和 async.h 文件都没有明确说明如何获取文件描述符 (ac->c.fd) 以实现例如 epoll 事件循环:
#define MAX_EVENTS 10
int epfd;
if((epfd = epoll_create1(0)) == -1) {
// handle error
};
redisAsyncContext *ac = redisAsyncConnect("127.0.0.1", 6379);
if(epoll_ctl(epfd, EPOLL_CTL_ADD, ac->c.fd, &(struct epoll_event) { .events = EPOLLIN | EPOLLOUT | EPOLLET }) == -1) {
// handle error
}
int nfds;
struct epoll_event events[MAX_EVENTS];
for(;;) {
if((nfds = epoll_wait(epfd, events, MAX_EVENTS, -1) == -1) {
// handle error
}
for(int i = 0; i < nfds; i++) {
if(events[i].events & EPOLLIN) redisAsyncHandleRead(ac);
if(events[i].events & EPOLLOUT) redisAsyncHandleWrite(ac);
}
}
【讨论】: