【问题标题】:Kombu Consumer as Thrift ServiceKombu 消费者作为旧货服务
【发布时间】:2014-06-20 23:00:19
【问题描述】:

我需要构建一个可以以编程方式控制的 Kombu 消费者。我看到的所有示例都只是告诉您使用 ctrl-c 停止程序的琐碎示例。

我的主应用程序作为 Twisted Thrift 服务运行,我想我可以以某种方式使用 Twisted reactor 来处理我的消费者内部的事件循环,但我不知道如何。

这是我的消费类。 start_sumption() 部分很好,只是它是阻塞的,我不能从外部调用 stop_sumption()。

from kombu import BrokerConnection, Exchange, eventloop, Queue, Consumer


class DMS():
    __routing_key = None
    __is_consuming = None
    __message_counter = 0

    def __init__(self, routing_key):
        print 'server: __init__()'
        self.__routing_key = routing_key

    def __handle_message(self, body, message):
        self.__message_counter += 1

        # Print count every 10,000 messsages.
        if (self.__message_counter % 10000) == 0:
            print self.__message_counter

    def start_consuming(self):
        print 'server: start_consuming()'
        self.__is_consuming = True
        exchange = Exchange('raven-exchange', type='topic', durable=False)
        queue = Queue(self.__routing_key, exchange, routing_key=self.__routing_key)

        with BrokerConnection('amqp://guest:guest@10.1.1.121:5672//') as connection:
            with Consumer(connection, queue, callbacks=[self.__handle_message]) as consumer:
                for _ in eventloop(connection):

                    if self.__is_consuming:
                        pass
                    else:
                        break

                consumer.cancel()
            connection.close()

    def stop_consuming(self):
        print 'server: stop_consuming()'
        self.__is_consuming = False

【问题讨论】:

    标签: python rabbitmq thrift kombu


    【解决方案1】:

    通过 MQ 系统路由 Thrift 服务调用的推荐方法是通过 oneway 调用,因为这是通过 MQ 和 MessageBus 系统进行通信的最自然的方式。

    struct Foo {
      1: string whoa
      2: i32 counter
    }
    
    service Whatever {
        oneway void FooBar(1: Foo someData, 2:i32 moreData)
    }
    

    oneway 调用是 Thrift RPC 调用的一种特殊形式:顾名思义,调用只向一个方向进行。返回值和异常(实际上是返回值)都不能与oneway 一起使用。该调用确实只发送输入参数并且不等待任何值返回。

    为了建立双向通信,客户端需要实现类似的服务,旨在接收传入的应答消息。 Thrift /contrib folder 中有一些示例,包括 0MQ、Rebus 和 Stomp。虽然他们没有专门处理 Python,但主要思想应该很清楚。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-20
      • 2020-08-11
      • 2011-08-16
      相关资源
      最近更新 更多