【问题标题】:Camel Redis Component subscribe to a channel not workingCamel Redis 组件订阅频道不起作用
【发布时间】:2013-05-07 20:53:44
【问题描述】:

我有一个简单的路由来监听 Redis 频道。由于某种原因,它不起作用。 这是我的路线。我验证了数据正在发布到 Redis 频道,我可以使用普通的 Jedis 订阅者将其读回。我在 Jetty 内运行 Camel,它被部署为战争。

public class RedisSubscriberRoute extends RouteBuilder{

    @Override
public void configure() throws Exception {

    from("spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel") 
    .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                String res = exchange.getIn().getBody().toString();
                System.out.println("************ " + res); 
                exchange.getOut().setBody(res);
            }
        })
    .to("log:foo");
}

}

更新(2013 年 5 月 10 日上午 9:56 EST):添加版本信息

    <properties>
            <spring.version>3.2.2.RELEASE</spring.version>
            <camel.version>2.11.0</camel.version>
            <jetty.version>7.6.8.v20121106</jetty.version>
    </properties>

Redis服务器版本为2.6.11

示例 git 项目在这里。 https://github.com/soumyasd/camelredisdemo

2013 年 5 月 10 日更新(美国东部标准时间晚上 10:18):

按照下面 cmets 的建议,我将 spring-data 的版本更改为 1.0.0.RELEASE。看起来消息正在发送给订阅者,但我仍然遇到异常。

java.lang.RuntimeException: org.springframework.data.redis.serializer.SerializationException: Cannot deserialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is java.io.StreamCorruptedException: invalid stream header: 77686174
    at org.apache.camel.component.redis.RedisConsumer.onMessage(RedisConsumer.java:73)[camel-spring-redis-2.11.0.jar:2.11.0]
    at org.springframework.data.redis.listener.RedisMessageListenerContainer.executeListener(RedisMessageListenerContainer.java:242)[spring-data-redis-1.0.0.RELEASE.jar:]
    at org.springframework.data.redis.listener.RedisMessageListenerContainer.processMessage(RedisMessageListenerContainer.java:231)[spring-data-redis-1.0.0.RELEASE.jar:]
    at org.springframework.data.redis.listener.RedisMessageListenerContainer$DispatchMessageListener$1.run(RedisMessageListenerContainer.java:726)[spring-data-redis-1.0.0.RELEASE.jar:]
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_45]

【问题讨论】:

  • 您使用的是哪个版本的 Camel 和 Redis?
  • 我已经用版本信息更新了问题,并提供了 github 上演示项目的链接。
  • 暂时可以试试在camel-spring-redis组件里改一下驱动版本如下: org.springframework.dataspring- data-redis1.0.0.RELEASE
  • 谢谢。我更改了版本,看起来有些东西正在工作。但是,我仍然遇到一个例外。请参阅我上面的“更新 2”。

标签: redis apache-camel publish-subscribe spring-data


【解决方案1】:

使用 v 1.0.3.RELEASE 的消费者有问题,请改用 1.0.0.RELEASE。

您得到的例外情况有所不同:Camel 生产者使用 Spring RedisTemplate,而后者又使用 JdkSerializationRedisSerializer。为了使其对称,消费者默认还使用 JdkSerializationRedisSerializer 来反序列化数据。因此,如果您使用 Camel 生产者发布数据,它应该可以正常工作而无需喧嚣。但是,如果您使用其他 redis 客户端(或在您的情况下使用其他一些库)将数据发布到 redis,则必须为使用者使用另一个序列化程序。解释很长,但要使它起作用实际上是两行:

from("spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&serializer=#serializer")

【讨论】:

  • 我正在使用 Jedis(和 Python (redis-py) 发布到 Redis 通道。所以我需要在我的 spring XML 中添加这个序列化程序。让我试试吧。
【解决方案2】:

这里总结了我为完成这项工作而必须进行的更改。

  1. 正如@Bilgin Ibryam 所指出的 - 您必须使用 spring-data-redis 的 1.0.0.RELEASE 版本(截至 2013 年 5 月 11 日)

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
                    <!-- IMPORTANT - as of 10-May-2013 the Redis Camel
                         component only works with version 1.0.0.RELASE -->
        <version>1.0.0.RELEASE</version>
    </dependency> 
    
  2. 我在pom.xml 中使用的其他版本是

    3.2.2.发布 2.11.0 7.6.8.v20121106

  3. 如果您使用 Camel Redis 组件发布和使用,则不必声明不同的序列化程序。在我的例子中,我使用 Jedis 从 python 以及普通的旧 Java 发布。我必须change as my route 包含序列化程序并在我的spring/camel config 中定义序列化程序。

    @覆盖 公共无效配置()抛出异常{

    from("spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&serializer=#redisserializer") 
    .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                String res = exchange.getIn().getBody().toString();
                System.out.println("************ " + res); 
                exchange.getOut().setBody(res); 
            }
        })
    .to("log:foo");
    

    }

【讨论】:

  • spring-data-redis 有 1.0.4 版本。你介意测试那个版本吗?
  • 我用 spring-data-redis 的 1.0.4.RELEASE 试过了,还是不行。不知道问题出在哪里。
  • 感谢您的测试和反馈
  • btw 现在已经在主干上修复了
猜你喜欢
  • 1970-01-01
  • 2016-05-22
  • 1970-01-01
  • 2017-03-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-08-09
  • 1970-01-01
相关资源
最近更新 更多