【问题标题】:How to use consumer groups with Spring Data Redis for Redis Streams (keep getting NOGROUP)?如何将消费者组与 Spring Data Redis 一起用于 Redis Streams(不断获得 NOGROUP)?
【发布时间】:2020-08-11 09:10:40
【问题描述】:

我正在尝试使用 Spring Data Redis 通过消费者组使用 Redis Stream,但不断收到以下异常:

Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option

消息似乎暗示我首先需要创建一个消费者组?但是文档没有提供任何参考:https://github.com/spring-projects/spring-data-redis/blob/master/src/main/asciidoc/reference/redis-streams.adoc

框架版本:

  • Spring Boot 2.2.6
  • 生菜 5.2.2
  • Redis 5.0.8

这是我用来消费流的代码:

@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder().pollTimeout(Duration.ofMillis(100)).build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
                        containerOptions);

        container.receive(Consumer.from("my-group", "my-consumer"),
                        StreamOffset.create("event-stream", ReadOffset.latest()),
                        message -> {
                                System.out.println("MessageId: " + message.getId());
                                System.out.println("Stream: " + message.getStream());
                                System.out.println("Body: " + message.getValue());
                                streamRedisTemplate.opsForStream().acknowledge("my-group", message);
                        });

        /*Subscription subscription = container.receive(StreamOffset.fromStart("event-stream"), message -> {

                System.out.println("MessageId: " + message.getId());
                System.out.println("Stream: " + message.getStream());
                System.out.println("Body: " + message.getValue());
        });*/

        container.start();

        return container;
}

全栈跟踪:

org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:54) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:52) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:471) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:361) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:529) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:239) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:305) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:300) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:234) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$3(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    ... 1 common frames omitted

【问题讨论】:

    标签: java spring redis spring-data spring-data-redis


    【解决方案1】:

    回答我自己的问题。似乎您确实需要先显式创建流和组,即使文档中的任何地方都没有提到。尽管除了向其发布消息之外,确实应该有更好的方法来初始化空流。

    private void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
            try {
                    //redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group");
                    redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
            } catch (RedisSystemException e) {
                    if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
                            log.info("STREAM - Redis group already exists, skipping Redis group creation: my-group-2");
                    } else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
                            log.info("STREAM - Stream does not yet exist, creating empty stream: event-stream");
                            // TODO: There has to be a better way to create a stream than this!?
                            redisTemplate.opsForStream().add("event-stream", Collections.singletonMap("", ""));
                            redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
                    } else throw e;
            }
    }
    
    

    编辑:正如@a​​nstue 在下面的 cmets 中提到的,spring-data-redis 2.3.1+ 现在在调用 createGroup 时自动创建流(如果它不存在)。但是,如果组确实存在,它将抛出 RedisSystemBusyException。所以我正在用我目前使用的解决方案更新答案,确保捕捉到这个异常。

    public class EventStreamUtils {
    
        public static void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
            try {
                // ReadOffset.from("0-0") will start reading stream from the very beginning.  Otherwise,
                // it will pick up at the point in the stream where the new group was created.
                //redisTemplate.opsForStream().createGroup(key, ReadOffset.from("0-0"), group);
                redisTemplate.opsForStream().createGroup(key, group);
            } catch (RedisSystemException e) {
                var cause = e.getRootCause();
                if (cause != null && RedisBusyException.class.equals(cause.getClass())) {
                    log.info("STREAM - Redis group already exists, skipping Redis group creation: {}", group);
                } else throw e;
            }
        }
    }
    

    【讨论】:

    • 在 spring-data-redis 2.3.1.RELEASE 及更高版本中,如果流不存在,则会使用createGroup 自动创建。
    【解决方案2】:

    我升级到 spring-data-redis 2.4.6(我相信这是撰写本文时最新的稳定版本),但在使用 opsForStream().createGroup() 创建空流组时仍然出现异常。

    我想出的解决方案是直接使用RedisStreamCommand 如下(继续OP 原始答案的try-catch):

    try {
      redisTemplate.getConnectionFactory().getConnection().xGroupCreate(
          "key".getBytes(),
          "group",
          ReadOffset.from("0-0"),
          true // this is important. It's to execute MKSTREAM command from redis
               // only available from 2.3.0.RELEASE and above
      );
    } catch (RedisSystemException e) {
      // your exception handling
      // getConnection() can also throw NullPointerException
    }
    

    我最初在我的 spring boot 项目中使用了 kotlin,所以代码可能需要一些调整。我不知道使用这种方法有多安全,但目前我知道这是唯一的方法。

    【讨论】:

      猜你喜欢
      • 2018-07-07
      • 2021-09-13
      • 1970-01-01
      • 1970-01-01
      • 2014-07-04
      • 2017-04-24
      • 1970-01-01
      • 2021-04-18
      • 2022-01-24
      相关资源
      最近更新 更多