【问题标题】:Get Queue Size in Pika (AMQP Python)在 Pika 中获取队列大小(AMQP Python)
【发布时间】:2012-01-01 19:36:26
【问题描述】:

简单的问题,但谷歌或 Pika 开源代码没有帮助。有没有办法在 Pika 中查询当前队列大小(项目计数器)?

【问题讨论】:

    标签: python amqp


    【解决方案1】:

    在 AMQP 协议中有两种获取队列大小的方法。您可以使用 Queue.Declare 或 Basic.Get。

    如果您使用 Basic.Consume 在消息到达时使用它们,那么您无法获取此信息,除非您断开连接(超时)并重新声明队列,或者获取一条消息但不确认它。在较新版本的 AMQP 中,您可以主动重新排队消息。

    至于 Pika,我不知道具体情况,但 AMQP 的 Python 客户端一直是我的眼中钉。通常,您需要对类进行猴子补丁以获取所需的信息,或者允许队列使用者超时,以便您可以定期执行其他操作,例如记录统计信息或找出队列中有多少消息。

    解决此问题的另一种方法是放弃,并使用 Pipe 类运行 sudo rabbitmqctl list_queues -p my_vhost。然后解析输出以找到所有队列的大小。如果您这样做,您将需要配置 /etc/sudoers 以不询问通常的 sudo 密码。

    我祈祷其他有更多 Pika 经验的人回答这个问题,指出你可以如何做我提到的所有事情,在这种情况下,我将下载 Pika 并踢轮胎。但是,如果这没有发生并且您在修补 Pika 代码时遇到困难,那么请查看 haigha。我发现他们的代码比其他 Python AMQP 客户端库更简单,因为它们更接近 AMQP 协议。

    【讨论】:

    • 感谢 sudo rabbitmqctl list_queues -p my_vhost 技巧,我会试试的。
    【解决方案2】:

    你试过PyRabbit吗?它有一个get_queue_depth()method,听起来就像你要找的一样。

    【讨论】:

    • 我不知道 PyRabbit。看起来很有希望,我会试试看!
    【解决方案3】:

    我知道这个问题有点老了,但这里有一个用 pika 做这个的例子。

    关于AMQP和RabbitMQ,如果你已经声明了队列,你可以在passive flag开启的情况下重新声明队列,并保持所有其他队列参数相同。对此声明 declare-ok 的响应将包括队列中的消息数。

    以下是 pika 0.9.5 的示例:

    import pika
    
    def on_callback(msg):
        print msg
    
    params = pika.ConnectionParameters(
            host='localhost',
            port=5672,
            credentials=pika.credentials.PlainCredentials('guest', 'guest'),
        )
    
    # Open a connection to RabbitMQ on localhost using all default parameters
    connection = pika.BlockingConnection(parameters=params)
    
    # Open the channel
    channel = connection.channel()
    
    # Declare the queue
    channel.queue_declare(
            callback=on_callback,
            queue="test",
            durable=True,
            exclusive=False,
            auto_delete=False
        )
    
    # ...
    
    # Re-declare the queue with passive flag
    res = channel.queue_declare(
            callback=on_callback,
            queue="test",
            durable=True,
            exclusive=False,
            auto_delete=False,
            passive=True
        )
    print 'Messages in queue %d' % res.method.message_count
    

    这将打印以下内容:

    <Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
    <Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
    Messages in queue 0
    

    您会从message_count 成员那里获得消息数量。

    【讨论】:

    • 非常感谢。你的回答对我有帮助。
    • 把这个留在这里,因为它咬了我,花了我几个小时。如果消费者没有预取太多消息,这只是消息的“真实”数量,理解这一点很重要。有关prefetch_count,请参阅pika.readthedocs.io/en/stable/modules/…
    • 正如接受的答案提到的那样,当 basic_consume 已经启动时不起作用,它只会显示 message_count=0 即使可能有很多。
    【解决方案4】:

    这是使用 pika 获取队列长度的方法(考虑到您在 localhost 上使用默认用户和密码) 用您的队列名称替换 q_name。

    import pika
    connection = pika.BlockingConnection()
    channel = connection.channel()
    q = channel.queue_declare(q_name)
    q_len = q.method.message_count
    

    【讨论】:

      【解决方案5】:

      我迟到了,但这是一个使用 pyrabbitpyrabbit2 从 AWS AmazonMQ 和 HTTPS 获取队列计数的示例,它也应该适用于 RabbitMQ:

      from pyrabbit2.api import Client
      
      cl = Client('b-xxxxxx.mq.ap-southeast-1.amazonaws.com', 'user', 'password', scheme='https')
      if not cl.is_alive():
          raise Exception("Failed to connect to rabbitmq")
      
      for i in cl.get_all_vhosts():
          print(i['name'])
      
      queues = [q['name'] for q in cl.get_queues('/')]
      print(queues)    
      
      itemCount = cl.get_queue_depth('/', 'event.stream.my-api')
      print(itemCount)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2012-07-11
        • 2014-08-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-01-05
        • 1970-01-01
        相关资源
        最近更新 更多