【问题标题】:Redis Pub/Sub with Spring Data Redis: Messages arrive in wrong order使用 Spring Data Redis 的 Redis Pub/Sub:消息以错误的顺序到达
【发布时间】:2015-11-20 20:04:42
【问题描述】:

我正在尝试使用带有 Spring Data Redis 的 Redis 发布/订阅来实现聊天。

我使用RedisTemplate发布消息,如下图:

public class RedisPublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic channelTopic, Object channelMessage) {
        redisTemplate.convertAndSend(channelTopic.getTopic(), channelMessage);
    }
}

为了接收消息,我有一个 MessageListener,如下所示:

public class RedisConsumer implements MessageListener {

MessageSerializer serializer = new MessageSerializer();
AtomicInteger atomicInteger = new AtomicInteger(0);

@Override
public void onMessage(Message message, byte[] pattern) {

    Object obj = serializer.deserialize(message.getBody());
    if(obj != null && obj instanceof RedisMessage) {
        System.err.println("Received message(" + atomicInteger.incrementAndGet() + ") " + obj.toString());
    }

}

消息是这样发布的:

final ChannelTopic channelTopic=connectionManager.subscribe("topic");
    new Thread(new Runnable() {
        public void run() {                
            Thread.sleep(5000);                
            for (int i = 0; i < 10; i++) {
                redisPublisher.publish(channelTopic, new RedisMessage(i + 1));
            }
        }
    }).run();

但是,收到的消息似乎以错误的顺序传递:

Received message(1) message id: 3
Received message(2) message id: 2
Received message(3) message id: 1
Received message(4) message id: 4
Received message(5) message id: 5
Received message(6) message id: 6
Received message(7) message id: 7
Received message(8) message id: 8
Received message(9) message id: 9
Received message(10) message id: 10

是否可以使用Spring提供的RedisTemplate/MessageListener同步发送/接收消息?

目前的代码库很小,可以在GitHub查看。

【问题讨论】:

    标签: java spring redis spring-data-redis


    【解决方案1】:

    众所周知,Redis PubSub 会按顺序传递消息(至少在您使用一个连接并触发 PUBLISH 时保证。PUBLISH 命令返回被通知的客户端数量)。乱序的原因是 Spring Data Redis 默认调度消息的方式。通知是在不同的线程上处理的,这就是原因。感谢您的代码,它帮助我快速重现了该行为。

    我可以想到两种可能的策略来解决这个问题:

    1. 但是,您可以提供一个执行 RedisMessageListenerContainer 中的订单的执行程序。目前,我想到的任何形式的同步都会损害性能。

    2. BinaryJedisPubSub 之上实现自己的消息侦听器。您可以控制消息,并且可以省略执行程序问题。

    HTH,马克

    【讨论】:

    • 关于strategy #2,我不完全确定我将如何去做——每当我将compile("redis.clients:jedis:2.7.2") 包含到我的gradle.build 时,我似乎都会遇到Bean 初始化错误。但是,它似乎在修改MessageListener 之后工作,以便它将收到的消息放入LinkedBlockingQueue,而不是在接收时直接反序列化它。我不完全确定这是否能保证它一直有效?如果您有兴趣,新代码仍可在link 获得。谢谢!
    • jedis 2.7.x 和 Spring Data Redis 1.5.x 之间存在兼容性问题(请参阅here)。我想到的一种解决方案是使用单线程执行器(分叉线程,而不是主线程)。 Redis 主要工作在单线程,因此消息不会同时发布。然后,单线程消费者可以将消息传递给进行进一步处理的事物。
    • 好的!我尝试对RedisMessageListenerContainer 进行以下修改:ExecutorService executorService = Executors.newSingleThreadExecutor(); redisContainer.setTaskExecutor(executorService); 但是,消费者仅在程序“关闭”后才开始工作,即仅在Intellij 中的停止应用程序按钮已被按下。
    • 别在意上述情况——从SingleThreadExecutor 更改为ExecutorService executorService = Executors.newFixedThreadPool(2); 似乎可以解决问题。我的猜测是发布者和订阅者共享同一个线程池,活动发布者因此阻止订阅者消费消息。感谢您的帮助!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-07
    • 1970-01-01
    • 2013-07-28
    • 2015-11-09
    • 1970-01-01
    相关资源
    最近更新 更多