【问题标题】:RabbitMQ - how to get queue size [duplicate]RabbitMQ - 如何获取队列大小
【发布时间】:2015-04-08 08:07:54
【问题描述】:

我希望我的 python 脚本在将任务发送到队列之前不断运行并检查队列大小是否为零。下面是一个sn-ps:

#!/usr/bin/env python
import pika
import sys

while True:

    # establish connection with RabbitMQ server
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # create queue
    # durable=True make sure RabbitMQ never lose our queue
    rabbit_queue = channel.queue_declare(queue="task_queue", durable=True)

    queue_size = rabbit_queue.method.message_count
    if queue_size <= 0:
        message = get_message()
        channel.basic_publish(exchange='',
                              routing_key="task_queue",
                              body=message,
                              properties=pika.BasicProperties(
                                  delivery_mode=2,  # make message persistent
                              ))

        print " [x] Sent %r" % (message,)
        time.sleep(3)

    # close connection to flush network buffers AND ensure our message was actually delivered to RabbitMQ
    connection.close()

不断地重新建立连接是一种好习惯吗?

【问题讨论】:

  • 它太宽泛了。最正确的答案:这取决于此脚本运行的频率、您的基础设施是什么、典型的 AMQP 代理负载是多少。
  • 此脚本作为后台脚本运行。它 nv 停止(即:while 循环)。我创建了大约 6 个队列。每个队列可能有大约 1500 个任务
  • 按原样尝试。如果您遇到性能问题,那么您一定会知道该怎么做。
  • 您好,8 多小时后,python 脚本冻结。我该怎么办?

标签: python rabbitmq


【解决方案1】:

我看不出有任何理由不将连接打开/关闭移动到while 循环之外:

#!/usr/bin/env python

import pika
import sys


# establish connection with RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

try:
    channel = connection.channel()

    # create queue
    # durable=True make sure RabbitMQ never lose our queue
    channel.queue_declare(queue="task_queue", durable=True)

    while True:
        rabbit_queue = channel.queue_declare(queue="task_queue", durable=True, passive=True)
        queue_size = rabbit_queue.method.message_count
        if queue_size <= 0:
            message = get_message()
            channel.basic_publish(exchange='',
                                  routing_key="task_queue",
                                  body=message,
                                  properties=pika.BasicProperties(
                                  delivery_mode=2,  # make message persistent
                                  ))

            print " [x] Sent %r" % (message,)
            time.sleep(3)
finally:
    connection.close()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-27
    • 2010-11-05
    • 2019-05-28
    • 2016-08-28
    相关资源
    最近更新 更多