【问题标题】:Manage RabbitMQ auto-delete queues in different threads. Python管理不同线程中的 RabbitMQ 自动删除队列。 Python
【发布时间】:2018-05-18 09:52:59
【问题描述】:

我想知道这是否是在不同线程中管理 auto_delete 队列的正确方法(主要用于测试我不希望 RabbitMQ 队列在连接关闭时保留的问题)

import pika
from threading import Thread

class ConsumerThread(Thread):

    def __init__(self, callback, queue):
        Thread.__init__(self)
        self.setDaemon(True)

        self.callback = callback
        self.queue = queue

    def run(self):
        # stablish connection
        connection = pika.BlockingConnection(pika.ConnectionParameters(CONNECTION['address'], CONNECTION['port'], CONNECTION['vhost'], CONNECTION['credentials']))
        channel = connection.channel()

        # create the auto-delete queue
        channel.queue_declare(queue=self.queue, auto_delete=True)

        # start consuming
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(self.callback, queue=self.queue)
        channel.start_consuming()

class Factory:

    def __init__(self):
        self.queue_init = "init.queue"
        self.queue_start = "start.queue"

        threads = [ConsumerThread(self.init_callback, self.queue_init), ConsumerThread(self.start_callback, self.queue_start)]
        for t in threads:
            t.start()

    def init_callback(self, ch, method, properties, body):
        # doing something

    def start_callback(self, ch, method, properties, body):
        # doing something

【问题讨论】:

    标签: python multithreading python-3.x rabbitmq pika


    【解决方案1】:

    RabbitMQ 团队会监控 the rabbitmq-users mailing list,并且有时只会在 StackOverflow 上回答问题。


    Pika 不是线程安全的。您必须确保 BlockingConnection 方法调用发生在连接和通道正在运行的同一线程上。根据您的代码,我不确定这是否会发生,因为您在 Factory 类中调用回调,这看起来很奇怪。为什么不在ConsumerThread 中使用这些方法呢?

    Pika 0.12 及更高版本将包含一个 add_callback_threadsafe 方法,该方法将安排一个方法在 ioloop 线程上执行。

    【讨论】:

    • 感谢您的回答和链接。我是新来和 pika 一起工作的。我在工厂中调用回调,因为工厂是一个抽象类。其中我将有不同的实例(例如 2 个队列),它们将在回调中执行不同的事情。我在网上寻找,ConsumerThreads 是我发现在不同线程中使用队列的最佳解决方案,我想到了在工厂中执行回调的想法。我不确定是否正确。但是程序按它必须的方式工作。我更担心是否是解决问题的好方法
    • 由于add_callback_threadsafe没有阻塞,有没有办法等待回调的返回?
    • 不,您必须自己通过您想使用的任何线程同步来实现。
    猜你喜欢
    • 2022-01-20
    • 1970-01-01
    • 2018-08-04
    • 2021-03-28
    • 2011-10-08
    • 1970-01-01
    • 1970-01-01
    • 2018-06-27
    • 1970-01-01
    相关资源
    最近更新 更多