【问题标题】:Threading issue when try to consume from RabbitMQ尝试从 RabbitMQ 消费时出现线程问题
【发布时间】:2019-08-22 13:09:44
【问题描述】:

我有消费者代码:

class Consumer(threading.Thread):
    def __init__(self,rabbitMQUrl,dgraphUrl):
        super(JaqlConsumer, self).__init__()
        self.parameters = pika.URLParameters(rabbitMQUrl)

    def run(self):    
        self.connection = pika.BlockingConnection(self.parameters)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='publish', exchange_type='topic')
        result = self.channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue
        self.channel.queue_bind(exchange='publish', queue=queue_name, routing_key='#')
        self.channel.basic_qos(prefetch_count=LIMIT)

    def process(values):
        print ("Process:" + str(len(values)))

    def on_message_callback(chan, method_frame, _header_frame, body, userdata=None):
        data = json.loads(body)
        self.values.append(data)
        if (len(self.values) >= LIMIT):
            process(self.values)
            self.values = []
            chan.basic_ack(delivery_tag=method_frame.delivery_tag,multiple=True)

    self.consumer_tag = self.channel.basic_consume(
        queue=queue_name, on_message_callback=on_message_callback)

    self.channel.start_consuming()

    def close(self):
        if hasattr(self, 'channel'):
            self.channel.basic_cancel(self.consumer_tag)
        if hasattr(self, 'connection'):
            if not self.connection.is_closed:
                self.connection.close()

现在这是我的 ma​​in.py。我正在尝试收听 ZK 节点,当值从 false 变为 true 时,我想从 RabbitMQ 消费,从 True 变为 false 我不想连接到 RabbitMQ:

    consumer = Consumer(brokerUrl)
consumer.setDaemon(True)
def toggleEnabled():
    # Get the enabled value from ZK and watch the next change
    isEnabled = config.get("enabled",enable_watch)
    print (isEnabled)
    if isEnabled:
        consumer = Consumer(brokerUrl,dgraphUrl)
        consumer.setDaemon(True)
        consumer.run()
    else:
        consumer.close()

def enable_watch(event):
    toggleEnabled()

toggleEnabled()

while True:
    time.sleep(1)

主要问题是在一次切换后,切换代码不会运行,我认为这是因为当前线程是 RabbitMQ 的消耗(这是我在暂停脚本时看到的)。从主线程切换到另一个线程的正确设计是什么?

【问题讨论】:

  • 你有一些打印语句,你能显示输出并描述你的期望吗?
  • hmm 基本上当从启用 -> 禁用 -> 启用时,我无法禁用(我将断点放在切换行中,即使 ZK 中的值更改也不会中断)。当我暂停时,我看到线路在 start_consume
  • 程序是否抛出异常?
  • 不,只是没有触发手表功能

标签: python-3.x multithreading rabbitmq


【解决方案1】:

以下代码应该是您的run() 方法的一部分:

self.consumer_tag = self.channel.basic_consume(
    queue=queue_name, on_message_callback=on_message_callback)

self.channel.start_consuming()

您在将代码粘贴到问题中时是否出错?

我建议将您的代码添加到 GitHub 存储库或 gist。然后,在pika-python 邮件列表中提出您的问题,我将继续在那里提供帮助。 Stack Overflow 不是来回协助的好地方。


注意:RabbitMQ 团队会监控 rabbitmq-users mailing list,并且有时只回答 StackOverflow 上的问题。

【讨论】:

    【解决方案2】:

    根据我的经验,您迟早会遇到使用 pika 和多线程的问题。我不确定你的实现细节以及你是如何使用多线程的,但我在使用 Pika 时也遇到了这个问题,我会试着告诉你是什么帮助我解决了这个问题。 Pika 不是线程安全的,因此如果您想在线程之间共享通道,则不能将多线程与 pika 一起使用。这是 github 上关于此主题的类似讨论的链接:here

    因此,如果您无论如何都想使用多线程和 pika,那么我对您的建议是使用多处理或为每个线程使用新的连接。这样每个Thread都会有一个独立的Connection。它效率不高,但我认为这是目前唯一的选择。

    【讨论】:

    • 我真的不想要多线程。一开始我的消费者代码在同一个线程上,但后来 ZK 监视功能不起作用,我认为这是因为一切都在同一个线程上 - 所以 pika 消费阻塞了线程。我错了吗?
    • @Shkolar 您的程序是否抛出错误?如果是,你能显示回溯
    • @Shkolar 如果它在同一个线程上,它会正常工作,但是一旦您开始使用多线程,这意味着当您为每个任务或每个订阅者分配一个独立线程时,您将开始拥有问题,因为你不能在线程之间共享频道
    • 没有错误,watch函数根本就没有调用。我不介意在同一个线程上,但同样,手表在同一个线程上时不工作
    • Pika is not thread safe so you cannot use Multithreading with pika. 这是不正确的说法。你必须知道你在做什么,但你可以使用带线程的 Pika。最好的方法是为每个线程创建一个连接。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多