【问题标题】:redis pub sub with jedis , sub crashes with errorredis pub sub 与 jedis , sub 崩溃并出现错误
【发布时间】:2012-08-20 14:41:50
【问题描述】:

全部

我已安装最新的 Redis 2.4.16 并尝试将其 Pub/Sub 系统与 java 一起使用。我每秒都在向频道发送一条消息。发布者没有问题,但订阅者因消息而崩溃

例外:

redis.clients.jedis.exceptions.JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context
at redis.clients.jedis.Protocol.processError(Protocol.java:59)
at redis.clients.jedis.Protocol.process(Protocol.java:66)
at redis.clients.jedis.Protocol.read(Protocol.java:131)
at redis.clients.jedis.Connection.getObjectMultiBulkReply(Connection.java:206)
at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:88)
at redis.clients.jedis.JedisPubSub.proceed(JedisPubSub.java:83)
at redis.clients.jedis.Jedis.subscribe(Jedis.java:1971)
at com.jedis.test.JedisSub$1.run(JedisSub.java:22)
at java.lang.Thread.run(Thread.java:680)

这是我的代码:

出版商:

        final Jedis jedis = new Jedis("localhost");

        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        newFixedThreadPool.submit(new Runnable() {

            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    jedis.publish("CC", new Date().toString());
                }

            }
        });

订阅者:

        JedisPool jedisPool = new JedisPool(poolConfig,"localhost", 6379, 100);
        final Jedis subscriberJedis = jedisPool.getResource();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    subscriberJedis.subscribe(new JedisPubSub() …..,"CC");
                } catch (Exception e) {
                   e.printStackTrace();
                }
            }
        }).start();

        jedisPool.returnResource(subscriberJedis);

池配置:

    JedisPoolConfig poolConfig = new JedisPoolConfig();
    poolConfig.maxActive = 10;
    poolConfig.maxIdle = 5;
    poolConfig.minIdle = 1;
    poolConfig.testOnBorrow = true;
    poolConfig.numTestsPerEvictionRun = 10;
    poolConfig.timeBetweenEvictionRunsMillis = 60000;
    poolConfig.maxWait = 3000;
    poolConfig.whenExhaustedAction = org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_FAIL;

为了安装 Redis,我只是使用了命令

make PREFIX=/Users/ggg/dev/dist/redis/ install

在这之后我没有使用./install_server.sh

Jedis 版本为 2.1.0,平台为 Mac OS X。

注意:我注意到订阅者在大约 30 秒时崩溃 开始后。

【问题讨论】:

    标签: java redis publish-subscribe jedis


    【解决方案1】:

    发布者和订阅者的代码都有自己的错误。

    这个错误是因为 Redis 连接不能在发布者和订阅者之间共享。实际上,发布者需要一个连接(或连接池),而订阅者线程只需要一个专用连接。每个进程运行一个订阅者线程通常就足够了。

    这里,在订阅者线程完成之前,您过早将订阅者Jedis 连接返回到池中,因此连接是共享的。

    在发布者中:

    由于您有一个由 10 个线程组成的池,因此您不应在这些线程之间共享唯一的连接。这是使用连接池的最佳场所,连接必须在每个线程中被抓取和释放。

        // This should be a global singleton
        JedisPool jedisPool = new JedisPool(poolConfig,"localhost", 6379, 100);
    
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        newFixedThreadPool.submit(new Runnable() {
    
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Jedis jedis = jedisPool.getResource();
                    try {
                       jedis.publish("CC", new Date().toString());
                    } catch (Exception e) {
                       e.printStackTrace();
                    } finally {
                       jedisPool.returnResource(jedis);
                    }
                }
    
            }
        });
    

    在订阅者中:

    在订阅者中,您需要一个专用连接。

        new Thread(new Runnable() {
            @Override
            public void run() {
                Jedis subscriberJedis = new Jedis("localhost");
                try {
                    subscriberJedis.subscribe(new JedisPubSub() …..,"CC");
                } catch (Exception e) {
                   e.printStackTrace();
                }
            }
        }).start();
    

    如果您需要订阅不同的频道或模式,最好将其他订阅设置在同一个线程和同一个连接中。

    【讨论】:

    • jedis 池的默认大小是多少?我正在使用 Executor 固定线程池来处理 50 个线程。
    猜你喜欢
    • 1970-01-01
    • 2017-03-31
    • 1970-01-01
    • 2015-11-09
    • 1970-01-01
    • 2012-08-12
    • 2016-01-18
    • 2011-10-17
    • 1970-01-01
    相关资源
    最近更新 更多