【问题标题】:In Pika or RabbitMQ, How do I check if any consumers are currently consuming?在 Pika 或 RabbitMQ 中,如何检查当前是否有消费者正在消费?
【发布时间】:2012-10-13 18:42:05
【问题描述】:

我想检查是否存在 Consumer/Worker 来消费我即将发送的 Message

如果没有Worker,我会启动一些worker(消费者和发布者都在一台机器上),然后开始发布Messages。 p>

如果有像connection.check_if_has_consumers这样的函数,我会有点像这样实现它——

import pika
import workers

# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
    # start the workers in other processes, using python's `multiprocessing`
    workers.start_workers()

# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
                            properties=pika.BasicProperties(delivery_mode=2))
connection.close()

但我在 pika 中找不到任何具有 check_if_has_consumers 功能的函数。

有没有办法使用 pika 来完成这个任务?或者,直接兔子交谈

我不完全确定,但我真的认为 RabbitMQ 会知道订阅不同队列的消费者数量,因为它确实向他们发送 messages 并接受确认

我刚开始使用 RabbitMQ 3 小时前...欢迎任何帮助...

这是我写的 workers.py 代码,如果有帮助的话......

import multiprocessing
import pika


def start_workers(num=3):
    """start workers as non-daemon processes"""
    for i in xrange(num):    
        process = WorkerProcess()
        process.start()


class WorkerProcess(multiprocessing.Process):
    """
    worker process that waits infinitly for task msgs and calls
    the `callback` whenever it gets a msg
    """
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_working = multiprocessing.Event()

    def run(self):
        """
        worker method, open a channel through a pika connection and
        start consuming
        """
        connection = pika.BlockingConnection(
                              pika.ConnectionParameters(host='localhost')
                     )
        channel = connection.channel()
        channel.queue_declare(queue='worker_queue', auto_delete=False,
                                                    durable=True)

        # don't give work to one worker guy until he's finished
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue='worker_queue')

        # do what `channel.start_consuming()` does but with stopping signal
        while len(channel._consumers) and not self.stop_working.is_set():
            channel.transport.connection.process_data_events()

        channel.stop_consuming()
        connection.close()
        return 0

    def signal_exit(self):
        """exit when finished with current loop"""
        self.stop_working.set()

    def exit(self):
        """exit worker, blocks until worker is finished and dead"""
        self.signal_exit()
        while self.is_alive(): # checking `is_alive()` on zombies kills them
            time.sleep(1)

    def kill(self):
        """kill now! should not use this, might create problems"""
        self.terminate()
        self.join()


def callback(channel, method, properties, body):
    """pika basic consume callback"""
    print 'GOT:', body
    # do some heavy lifting here
    result = save_to_database(body)
    print 'DONE:', result
    channel.basic_ack(delivery_tag=method.delivery_tag)

编辑:

我必须继续前进,所以这是我要采取的解决方法,除非有更好的方法出现,

所以,RabbitMQ 有这些 HTTP management apis,它们在你打开 management plugin 并且在 HTTP api 页面中间有

/api/connections - 所有打开的连接的列表。

/api/connections/name - 单个连接。删除它将关闭连接。

所以,如果我通过不同的 Connection 名称/用户连接我的 Workers 和我的 Produces,我将能够检查是否Worker Connection 已打开...(工人死亡时可能会出现问题...)

将等待更好的解决方案...

编辑:

刚刚在 rabbitmq 文档中找到了这个,但是在 python 中这样做会很麻烦:

shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue    0
...done.

所以我可以做类似的事情,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")

hacky...仍然希望 pika 有一些 python 函数来执行此操作...

谢谢,

【问题讨论】:

    标签: python rabbitmq pika


    【解决方案1】:

    我也在研究这个。在阅读了源代码和文档后,我在 channel.py 中发现了以下内容:

    @property
    def consumer_tags(self):
        """Property method that returns a list of currently active consumers
    
        :rtype: list
    
        """
        return self._consumers.keys()
    

    我自己的测试成功了。我在我的频道对象是 self._channel 的地方使用了以下内容:

    if len(self._channel.consumer_tags) == 0:
            LOGGER.info("Nobody is listening.  I'll come back in a couple of minutes.")
            ...
    

    【讨论】:

      【解决方案2】:

      我实际上是在寻找其他问题时偶然发现的,但可能对您有所帮助的是 Basic_Publish 函数,有一个默认为 False 的参数“Immediate”。

      您可以做的一个想法是将立即标志设置为 True,这将要求消费者立即使用它,而不是坐在队列中。如果一个工作人员无法使用该消息,它将返回一个错误,告诉您启动另一个工作人员。

      根据系统的吞吐量,这可能会产生大量额外的工作人员,或者产生工作人员来替换死去的工作人员。对于前一个问题,您可以编写一个类似管理员的系统,通过控制队列简单地跟踪工作人员,您可以在其中告诉类似“Runner”的进程来杀死现在不再需要的工作人员的进程。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-07-23
        • 1970-01-01
        • 2017-07-31
        • 2011-04-15
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多