【发布时间】:2014-12-16 06:53:41
【问题描述】:
我想与阻塞同步消费一个队列(RabbitMQ)。
注意:下面是可以运行的完整代码。
系统设置使用 RabbitMQ 作为排队系统,但我们的模块之一不需要异步消费。
我尝试在 BlockingConnection 之上使用 basic_get,它不会阻塞(立即返回 (None, None, None)):
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
我也尝试过使用consume generator,在长时间不使用后失败并显示“连接已关闭”。
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
有没有办法使用 pika 客户端来使用 RabbitMQ,就像我在 python 中使用 Queue.Queue 一样?或类似的东西?
我目前的选择是忙等待(使用 basic_get)——但如果可能的话,我宁愿使用现有系统不忙等待。
完整代码:
#!/usr/bin/env python
import pika
import time
TEST_QUEUE = 'test'
def get_connection():
# define connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=YOUR_IP,
port=YOUR_PORT,
credentials=pika.PlainCredentials(
username=YOUR_USER,
password=YOUR_PASSWORD,
)
)
)
return connection
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
print "blocking_get_1"
blocking_get_1()
print "blocking_get_2"
blocking_get_2()
get_connection().channel().queue_delete(TEST_QUEUE)
【问题讨论】:
-
我认为这也与不发送心跳有关(
consume可能会阻止它们?)如下所示:stackoverflow.com/questions/14572020/… -
我发表了我对此的看法,但如果我误解了你的问题,请告诉我。 :)
标签: python python-2.7 rabbitmq pika