【问题标题】:redis subscriber can't work with redis publisherredis 订阅者无法与 redis 发布者一起使用
【发布时间】:2017-04-24 16:42:20
【问题描述】:

我现在使用java来设计redis pub/sub系统,遇到了问题。我会告诉你细节:

这里的发布者:

public class RedisMessagePublisher implements MessagePublisher {

public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
    this.redisTemplate = redisTemplate;
    this.topic = topic;
}

private StringRedisTemplate redisTemplate;

private ChannelTopic topic;

@Override
public void publish(String message) {
    redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

发布者是正确的,可以正常工作。

然后让我们转到订阅者类:

public class RedisMessageSubscriber implements MessageListener {

//action inspect here
private Action2<Message, byte[]> action;

public void setAction(Action2<Message, byte[]> action) {
    logger.info("action set");
    this.action = action;
}

private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);

@Override
public void onMessage(Message message, byte[] bytes) {

    logger.info("===> redis subscribe message in <===");

    if (action != null)
        action.call(message, bytes);
    else
        logger.info("===> action is null <===");
    }
}

在订阅者类中,我使用RxJava 来注入Action,以便我可以更轻松地使用它。

但是问题来了,我发布了发布者的消息后,我可以c,消息可以转移到onMessage方法,日志打印不是我期望的:

===> redis subscribe message in <===
===> action is null <===

我所期望的是,当我发布一条新消息时,订阅者会收到它并运行我创建的操作。

下面我用来触发发布者和订阅者的服务:

@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {

@Autowired
private RedisMessagePublisher redisMessagePublisher;

@Autowired
private RedisMessageSubscriber redisMessageSubscriber;

private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);

@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
                                            HttpServletResponse response) {

    redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
        @Override
        public void call(Message message, byte[] bytes) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                String result = objectMapper.readValue(message.getBody(), String.class);
                logger.info("receive:"+result);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    });

    redisMessagePublisher.publish(message);

    return new ApiResponse("success","message sent");
    }
}

从上面的代码中,你可以知道我订阅了主题并为订阅者设置一个新的动作:

 redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
    @Override
    public void call(Message message, byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            String result = objectMapper.readValue(message.getBody(), String.class);
            logger.info("receive:"+result);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});

但是不知道为什么,在触发发布者后,订阅者可以得到消息但是却持有NULL Action,我创建的Action没有通过给它。

有人可以帮忙吗?这个机制有问题吗?

====编辑=====

RedisMessageConfig 代码如下:

@Configuration
public class RedisMessageConfig {

@Bean
ChannelTopic topic() {
    return new ChannelTopic("useraddresspubsub:queue");
}

@Bean
MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

@Autowired
private RedisConnectionFactory JedisConnectionFactory;

@Bean
RedisMessageListenerContainer redisContainer() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(JedisConnectionFactory);
    container.addMessageListener(messageListener(), topic());
    return container;
    }
}

====已解决====

最后我按照 mp 的想法解决了这个问题,将 myredismessagesubscriber 稍微更改为 myredismessageconfig,因为流程是从 redismessageconfig 到 redismessagesubscriber,所以在 redismessageconfig 中,我需要先向它注入动作,然后 redismessageconfig 将创建新的 redismessagesubscriber 并持有新的创建的动作。下面的代码:

@Component
public class MyRedisMessageConfig extends RedisMessageConfig {

private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);

public MyRedisMessageConfig() {
    super.action = new Action2<Message, byte[]>() {
        @Override
        public void call(Message message, byte[] bytes) {
            String result = new String(message.getBody());
                logger.info("received:" + result);
            }
        };
    }
}

截图如下:

【问题讨论】:

    标签: java redis publisher subscriber


    【解决方案1】:

    MessageListener 的工作方式并非如此。此外,您创建共享的可变状态。两个并发调用同时更改RedisMessageSubscriber 的状态。

    我假设您在一个线程中设置 action 并且在另一个线程上接收消息时遇到了可见性问题。

    如果您需要每个 MessageListener 的不同行为,则创建多个实现该行为的侦听器。

    【讨论】:

    • 没有。多个线程使其对您可见。问题是由共享可变状态引起的。
    • 好的,你能分享一些可运行的想法来改进这个场景吗?
    • 当然。看看gist.github.com/mp911de/3d5f1d9fa1efa105044e51f2ceb9648f,它说明了这种方法。
    • 抱歉,我无法根据您的链接 url 运行代码。因为如果我使用你的场景,那么redis config会改变,我把redisMessageConfig的代码贴在上面。
    猜你喜欢
    • 2017-11-27
    • 2020-09-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-05-22
    • 2015-09-04
    相关资源
    最近更新 更多