【问题标题】:How to reconnect consumer with twisted connection for pika?如何使用 pika 的扭曲连接重新连接消费者?
【发布时间】:2017-10-03 01:29:27
【问题描述】:

我使用 pika 扭曲连接作为 RabbitMQ 消费者,这是我的代码:

@defer.inlineCallbacks
def run(connection):
    queue_name = 'aaa'
    channel = yield connection.channel()
    queue = yield channel.queue_declare(queue=queue_name, auto_delete=False, exclusive=False)
    yield channel.queue_bind(exchange='amq.direct',queue=queue_name,routing_key=queue_name)
    yield channel.basic_qos(prefetch_count=1)
    queue_object, consumer_tag = yield channel.basic_consume(queue=queue_name,no_ack=False)
    logger.info('[room server]start consume queue %s', queue_name)

    l = task.LoopingCall(read, queue_object)
    l.start(0.1)


@defer.inlineCallbacks
def read(queue_object):
    ch,method,properties,body = yield queue_object.get()
    try:
        data = json.loads(body)
        head_code = data['head_code']
        openid = data['openid']
        message_content = data['message_content']
        conn_id = -1
        try:
            conn_id = data['conn_id']
        except:
            pass
        message_dispatcher(head_code, openid, message_content, conn_id)
        yield ch.basic_ack(delivery_tag=method.delivery_tag)
    except ValueError as e:
        logger.error('[error!]error body %s' % body)
        yield ch.basic_ack(delivery_tag=method.delivery_tag)

credentials = pika.PlainCredentials(config.RABBITMQ_USERNAME, config.RABBITMQ_PASSWD)
parameters = pika.ConnectionParameters(credentials=credentials)
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)

def got_error(failure, d):
    logger.error(failure)
    d = cc.connectTCP(config.RABBITMQ_HOST, config.RABBITMQ_PORT)


def start():
    d = cc.connectTCP(config.RABBITMQ_HOST, config.RABBITMQ_PORT)
    d.addCallback(lambda protocol: protocol.ready)
    d.addCallback(run)
    d.addErrback(got_error, d)

我的问题是连接中断时,重新连接过程不起作用:

如何重新连接?

【问题讨论】:

    标签: python rabbitmq twisted pika


    【解决方案1】:

    根据TwistedProtocolConnection docstring,可以提供on_close_callback 函数来处理连接终止。在此函数中,reason_code and reason_text 必须是 args。因此,创建另一个 on_close 回调来处理连接终止及其发生的原因,然后执行必要的逻辑以连接到 RabbitMQ:

    def connection_termination(reason_code, reason_text):
        """
        Log the reasons why the connection terminates and then reconnect
        """
        # put your connection code here
        # incrementally space out your reconnections, eg. 2 seconds, if fail, 5 seconds, if fail 10 seconds, etc...
    

    一旦您有了该代码,那么您的 ClientCreator 代码应遵循以下示例:

    cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters, connection_termination)
    

    很遗憾,我目前无法对此进行测试,但它应该可以工作。您已经掌握了大部分逻辑,所以剩下的留给您作为练习;)如果您有任何问题,请发表评论,如果您有解决方案,请向其他人展示最终结果。

    【讨论】:

    • 我尝试了您的代码,但引发了异常:在 start d = cc.connectTCP(config.RABBITMQ_HOST, config.RABBITMQ_PORT) 文件中“/Library/Python/2.7/site-packages/twisted/internet/protocol .py”,第 292 行,在 connectTCP bindAddress=bindAddress) 文件“/Library/Python/2.7/site-packages/twisted/internet/protocol.py”,第 274 行,在 _connect self.reactor, self.protocolClass(*self .args, **self.kwargs), d) TypeError: __init__() 正好需要 2 个参数(给定 3 个)
    猜你喜欢
    • 1970-01-01
    • 2019-09-19
    • 2017-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-29
    • 2018-03-14
    相关资源
    最近更新 更多