【问题标题】:Synchronous and blocking consumption in RabbitMQ using pika使用 pika 在 RabbitMQ 中同步和阻塞消费
【发布时间】:2014-12-16 06:53:41
【问题描述】:

我想与阻塞同步消费一个队列(RabbitMQ)。

注意:下面是可以运行的完整代码。

系统设置使用 RabbitMQ 作为排队系统,但我们的模块之一不需要异步消费。

我尝试在 BlockingConnection 之上使用 basic_get,它不会阻塞(立即返回 (None, None, None)):

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

我也尝试过使用consume generator,在长时间不使用后失败并显示“连接已关闭”。

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)

有没有办法使用 pika 客户端来使用 RabbitMQ,就像我在 python 中使用 Queue.Queue 一样?或类似的东西?

我目前的选择是忙等待(使用 basic_get)——但如果可能的话,我宁愿使用现有系统不忙等待。

完整代码:

#!/usr/bin/env python
import pika
import time

TEST_QUEUE = 'test'
def get_connection():
        # define connection
        connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                        host=YOUR_IP,
                        port=YOUR_PORT,
                        credentials=pika.PlainCredentials(
                                username=YOUR_USER,
                                password=YOUR_PASSWORD,
                        )
                )
        )
        return connection

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)


print "blocking_get_1"
blocking_get_1()

print "blocking_get_2"
blocking_get_2()

get_connection().channel().queue_delete(TEST_QUEUE)

【问题讨论】:

  • 我认为这也与不发送心跳有关(consume 可能会阻止它们?)如下所示:stackoverflow.com/questions/14572020/…
  • 我发表了我对此的看法,但如果我误解了你的问题,请告诉我。 :)

标签: python python-2.7 rabbitmq pika


【解决方案1】:

Pika 的一个常见问题是它当前没有在后台处理传入事件。这基本上意味着在许多情况下,您需要定期调用connection.process_data_events() 以确保它不会错过心跳。

这也意味着,如果您长时间睡眠,鼠兔将不会处理传入的数据,最终会因为没有响应心跳而死亡。这里的一个选项是禁用心跳。

我通常通过在后台检查新事件的线程来解决此问题,如 this 示例所示。

如果你想完全阻止我会做这样的事情(基于我自己的库AMQPStorm)。

while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if result:
        print("Message:", message.body)
        message.ack()
    else:
        print("Channel Empty.")
        sleep(1)

这是基于找到的示例here

【讨论】:

  • 我记得从两个线程访问连接时遇到了麻烦。线程间通信增加了开销,所以我将等待一种没有它的方法。稍后我会再试一次并在这里更新。
  • 是的,如果您使用 pika 可能会很困难。它不是为线程设计的,但我链接的示例可以处理大量同时消息。另一方面,我的库 amqp-storm 应该更容易,因为它是线程安全的。
  • 所提供的链接已过时...
猜你喜欢
  • 2014-03-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-08-19
  • 1970-01-01
相关资源
最近更新 更多