【问题标题】:How to send RabbitMQ messages to Pykka actor?如何向 Pykka 演员发送 RabbitMQ 消息?
【发布时间】:2015-02-08 17:52:15
【问题描述】:

2015 年 8 月更新:对于想要使用消息传递的人,我目前推荐 zeromq。可以作为 pykka 的补充或完全替代使用。

如何监听 RabbitMQ 队列中的消息,然后将它们转发给 Pykka 中的参与者?

目前,当我尝试这样做时,我得到了奇怪的行为并且系统停止了停止。

以下是我实现我的演员的方式:

class EventListener(eventlet.EventletActor):
    def __init__(self, target):
        """
        :param pykka.ActorRef target: Where to send the queue messages.
        """
        super(EventListener, self).__init__()

        self.target = target

    def on_start(self):
        ApplicationService.listen_for_events(self.actor_ref)

这是我在 ApplicationService 类中的方法,它应该检查队列中的新消息:

@classmethod
def listen_for_events(cls, actor):
    """
    Subscribe to messages and forward them to the given actor.
    """    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test')
    def callback(ch, method, properties, body):
        message = pickle.loads(body)
        actor.tell(message)

    channel.basic_consume(callback, queue='test', no_ack=True)
    channel.start_consuming()            

似乎start_consuming 无限期阻塞。有没有办法让我自己定期“轮询”队列?

【问题讨论】:

  • 您在一个程序中同时使用pikapykka 有什么特别的原因吗?似乎您最好单独使用pykka
  • 为了重现此行为,您能否在这里分享更多代码,例如 ApplicationServiceClass 和其他依赖代码?
  • @dano 我需要并发进程来运行以响应队列消息。 (想想某种密集的数据分析)。

标签: python rabbitmq actor pika pykka


【解决方案1】:

您的所有代码对我来说都是正确的。如果您想检查每个参与者使用的队列,您可以在从 Actor#start 返回的参与者引用中检查他们的 actor_inbox 属性。

我在从EventletActor 继承时遇到了类似的问题,所以为了测试我尝试了使用EventletActorThreadingActor 的相同代码。据我从源代码中可以看出,他们都使用eventlet 来工作。 ThreadingActor 非常适合我,但 EventletActor 不适用于 ActorRef#tell,它适用于 ActorRef#ask

我从同一个目录中的两个文件开始,如下所示。

my_actors.py:初始化两个actor,它们将通过打印以类名开头的消息内容来响应消息。

from pykka.eventlet import EventletActor
import pykka


class MyThreadingActor(pykka.ThreadingActor):
    def __init__(self):
        super(MyThreadingActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyThreadingActor Received: {message}".format(
                message=message)
        )


class MyEventletActor(EventletActor):
    def __init__(self):
        super(MyEventletActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyEventletActor Received: {message}".format(
                message=message)
        )


my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()

my_queue.py:在 pika 中建立一个队列,向队列发送消息,然后转发给之前设置的两个actor。在告知每个参与者该消息后,他们当前的参与者收件箱会检查队列中的任何内容。

from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika


def on_message(channel, method_frame, header_frame, body):
    print "Received Message", body
    my_threading_actor_ref.tell({"msg": body})
    my_eventlet_actor_ref.tell({"msg": body})

    print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
    print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox

    channel.basic_ack(delivery_tag=method_frame.delivery_tag)


queue_name = 'test'
connection = pika.BlockingConnection()

channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

    # It is very important to stop these actors, otherwise you may lockup
    my_threading_actor_ref.stop()
    my_eventlet_actor_ref.stop()
connection.close()

当我运行my_queue.py 时,输出如下:

收到消息一条消息

ThreadingActor 收件箱<Queue.Queue instance at 0x10bf55878>

MyThreadingActor 收到:{'msg': 'A Message'}

EventletActor 收件箱<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>

当我点击CTRL+C 停止队列时,我注意到EventletActor 终于收到了消息并打印出来:

^CMyEventletActor 收到:{'msg': 'A Message'}

所有这一切让我相信EventletActor 中可能存在错误,我认为您的代码很好,并且存在我在第一次检查时无法在代码中找到的错误。

我希望这些信息对您有所帮助。

【讨论】:

  • 有趣...我停止使用 ThreadingActors 因为我需要生成一大堆它们...但似乎 EventletActors 无法与 ThreadingActors 互操作,因此必须切换每个actor到一个线程。 Eventlet对我来说仍然是个谜。太糟糕了 pykka 没有更新。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-24
  • 2012-10-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多