【问题标题】:Consuming rabbitmq queue from inside python threads从 python 线程内部使用 rabbitmq 队列
【发布时间】:2014-08-25 15:26:49
【问题描述】:

这是一篇很长的文章。

我有一个用户名和密码列表。对于每个我想登录帐户并做某事的人。我想用几台机器更快地做到这一点。我正在考虑这样做的方式是拥有一台主机,其工作只是有一个 cron,它不时检查 rabbitmq 队列是否为空。如果是,则从文件中读取用户名和密码列表并将其发送到 rabbitmq 队列。然后有一堆机器订阅了该队列,其工作正在接收用户/通行证,在上面做一些事情,确认它,然后继续下一个,直到队列为空,然后主机将其填满再次。到目前为止,我认为我已经完成了所有工作。

现在我的问题来了。我已经检查过每个用户/通行证要做的事情不是那么密集,所以我可以让每台机器使用 python 的线程同时执行其中三个。事实上,对于一台机器,我已经实现了这一点,我将用户/通行证加载到 python Queue() 中,然后让三个线程使用该 Queue()。现在我想做类似的事情,但不是从 python Queue() 消费,每台机器的每个线程都应该从 rabbitmq 队列消费。这就是我卡住的地方。为了运行测试,我开始使用 rabbitmq 的教程。

发送.py:

import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

message = ' '.join(sys.argv[1:])
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
connection.close()

worker.py

import time, pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print ' [x] received %r' % (body,)
    time.sleep( body.count('.') )
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.start_consuming()

对于上述情况,您可以运行两个 worker.py,它们将订阅 rabbitmq 队列并按预期消费。

我没有rabbitmq的线程是这样的:

runit.py

class Threaded_do_stuff(threading.Thread):
    def __init__(self, user_queue):
        threading.Thread.__init__(self)
        self.user_queue = user_queue

    def run(self):
        while True:
            login = self.user_queue.get()
            do_stuff(user=login[0], pass=login[1])
            self.user_queue.task_done()

user_queue = Queue.Queue()
for i in range(3):
    td = Threaded_do_stuff(user_queue)
    td.setDaemon(True)
    td.start()

## fill up the queue
for user in list_users:
    user_queue.put(user)

## go!
user_queue.join()

这也可以按预期工作:填满队列并有 3 个线程订阅它。现在我想做的是类似 runit.py 的东西,但不是使用 python Queue(),而是使用类似 worker.py 的东西,其中队列实际上是一个 rabbitmq 队列。

这是我尝试过但没有成功的方法(我不明白为什么)

rabbitmq_runit.py

import time, threading, pika

class Threaded_worker(threading.Thread):
    def callback(self, ch, method, properties, body):
        print ' [x] received %r' % (body,)
        time.sleep( body.count('.') )
        ch.basic_ack(delivery_tag = method.delivery_tag)

    def __init__(self):
        threading.Thread.__init__(self)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='hello')
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.callback, queue='hello')

    def run(self):
        print 'start consuming'
        self.channel.start_consuming()

for _ in range(3):
    print 'launch thread'
    td = Threaded_worker()
    td.setDaemon(True)
    td.start()

我希望这会启动三个线程,每个线程都被 .start_sumption() 阻塞,它只是停留在那里等待 rabbitmq 队列向它们发送一些东西。相反,该程序启动、打印并退出。存在的模式也很奇怪:

launch thread
launch thread
start consuming
launch thread
start consuming

特别注意缺少一个“开始消费”。

发生了什么事?

编辑:我发现一个类似问题的答案是here Consuming a rabbitmq message queue with multiple threads (Python Kombu) 答案是“使用芹菜”,无论这意味着什么。我不买它,我不应该需要像芹菜那样复杂的东西。特别是,我没有尝试设置 RPC,也不需要读取来自 do_stuff 例程的回复。

编辑 2:我期望的打印模式如下。我愿意

python send.py first message......
python send.py second message.
python send.py third message.
python send.py fourth message.

打印图案是

launch thread
start consuming
 [x] received 'first message......'
launch thread
start consuming
 [x] received 'second message.'
launch thread
start consuming
 [x] received 'third message.'
 [x] received 'fourth message.'

【问题讨论】:

    标签: python multithreading rabbitmq


    【解决方案1】:

    问题是你让线程成为守护进程:

    td = Threaded_worker()
    td.setDaemon(True)  # Shouldn't do that.
    td.start()
    

    守护线程将被终止as soon as the main thread exits:

    可以将线程标记为“守护线程”。这件事的意义 flag 是当只有守护线程时整个 Python 程序退出 留下了。初始值继承自创建线程。这 flag 可以通过 daemon 属性设置。

    省略setDaemon(True),您应该会看到它的行为符合您的预期。

    另外,pika FAQ 有一个关于如何在线程中使用它的说明:

    Pika 在代码中没有任何线程概念。如果你想 使用带线程的 Pika,确保每个都有 Pika 连接 线程,在该线程中创建。分享一只鼠兔是不安全的 跨线程连接。

    这建议您应该将您在__init__() 中执行的所有操作移至run(),以便在您实际从队列中消费的同一线程中创建连接。

    【讨论】:

    • 就是这样!太神奇了,没想到有人会看这么长的帖子,还好回答。非常感谢!
    • 为了完整起见:rabbitmq.com/tutorials/amqp-concepts.html,章节“通道”和“连接”:最好每个线程有一个通道并共享连接,但 pika 不支持。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-03
    • 1970-01-01
    • 2013-10-10
    相关资源
    最近更新 更多