stream是redis5.0新增的一个数据类型,完善了消息队列功能,可以通过rdb或aof持久化到硬盘上;
简单模式相关命令:
XADD key ID field string [field string ...]
XLEN key
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
XDEL key ID [ID ...]
XTRIM key MAXLEN [~] count
组模式相关命令:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER keyroupname consumername]
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XPENDING key group [start end count] [consumer]
XACK key group ID [ID ...]
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
信息查询命令:
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
命令解释:
- XADD key ID field string [field string ...]
添加stream消息,key不存在则新建stream;
ID可以为*,表示redis自动生成id,id由一个long数字+横线+一个long数字组成,自动生成的id第一个long是当前时间毫秒值,第二个long是在当前毫秒值上从0递增的数字;
ID也可以自定义,最小id是0-1(不能是0-0,但存在1-0,、2-0……),后续id的第一个数字不能小于已经存在的id的第一个数字,当第一个long相同时第二个long数字要大于已经存在的最大的第二个long的值;
自动生成id可以处理服务器时间改动问题,如果发现当前时间的long值比stream最大的long小,则保持最大的long不变,递增第二个long;
可以在key和ID之间加入 MAXLEN [~] n 参数在添加新数据的时候同时限制stream长度,会丢弃超过长度的旧数据,~表示非绝对长度限制;
- XLEN key
返回stream的长度;
- XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
读取stream消息;
[COUNT count]设置读取数量;
[BLOCK milliseconds]设置阻塞等待时间(ms),获取不到返回nil;
[key ...]可以读取多个stream;
[ID ...]表示读取该id之后的数据,可以为0表示从stream第一个数据读取,可以为$表示阻塞等待读取最新消息(需跟BLOCK同时使用);
读取多stream的时候COUNT会对每个stream读取count个数据;
- XRANGE key start end [COUNT count]
获取一段区间的消息;
start起始id,-表示无穷小;
end结束id,+表示无穷大;
start和end的id都包括在内;
- XREVRANGE key end start [COUNT count]
反向获取一段区间的消息;
- XDEL key ID [ID ...]
删除stream的消息;
- XTRIM key MAXLEN [~] count
设置队列长度;
丢弃超过长度的旧的消息;
~表示非绝对长度(旧数据可能会被丢弃也可能不丢弃,性能高);
- XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER keyroupname consumername]
消息组操作;
[CREATE key groupname id-or-$]创建消息消费组,id表示设置组的初始id(记录最后消费id,从该id之后开始读取消息),id为0表示组从stream的第一条数据开始读,id为$表示组从新的消息开始读取;
[SETID key id-or-$]设置消息组最后消费的id,用于读取该id之后的消息;
[DESTROY key groupname]销毁消息组;
[DELCONSUMER keyroupname consumername]清理消息组的消费者(组消费者从xreadgroup直接指定);
- XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
消费组消息(消费的消息会记录在pending列表里,等待xack确认,需要先重建消费组);
[COUNT count]设置获取消费消息数量;
[BLOCK milliseconds]设置阻塞等待时间(ms)(通常跟>一起使用获取新消息);
[ID ...]读取id之后的消息,id为0可获取已读但未确认的消息(可多次消费,不会获得未读取的消息),id为>表示读取组内未读取的消息(同组内成员会消费不同消息); - XPENDING key group [start end count] [consumer]
查询已消费但未确认的组消息;
[start end count]设置查询起始id,结束id,数量;
[consumer]可以指定消费者;
返回值4行:消息id;消费者;最后消费时间距离现在经过的ms时间;消费次数;
- XACK key group ID [ID ...]
确认pending列表里的消息;
- XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
将pending里的消息转移给其他消费者;
consumer 新消费者;
min-idle-time 最小空闲时间,空闲超过该时间的消息才转移;
[ID ...]消息id;
[IDLE ms]设置转移后的空闲时间;
[TIME ms-unix-time]通过unix时间设置;
[RETRYCOUNT count]设置转移后消息的消费次数;
[force]强制转换(id需为stream里存在的id)(可强制添加给其他组的消费者,但本组pending列表不删去);
[justid]命令只返回id,消费次数不变,否则返回完整消息,消费次数+1(若设置RETRYCOUNT则以设置为准); - XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
查询相关的消息;
查询组消费者信息:
name:消费者名字;
pending:未确认消息列表长度;
idle:距离最后一次读取消息经过的时间ms;
读取组信息:
name:消费组的名字;
consumers:组内消费者的数量;
pending:未确认消息的数量;
last-delivered-id:最后消费消息的id;
读取stream信息:
length:stream的长度;
groups:消费组数量;
last-generated-id:保存了最后自动生成的id;
first-entry:第一个消息;
last-entry:最后一个消息;
(radix-tree-keys和radix-tree-nodes可能跟存储结构相关)