【问题标题】:RabbitMQ pika losses connectionRabbitMQ pika 失去连接
【发布时间】:2021-12-02 00:04:45
【问题描述】:

我在一个项目中工作是为了好玩,但过去两周我一直在同一个问题上卡住。场景如下:

  1. 一个消费者。每次收到消息时,它都会向 API 发送一个 GET 请求。此请求需要不到一秒钟的时间。
  2. 出版商。此脚本在循环中寻找更改,当它检测到更改时它会发送并向消费者发送消息。

问题:这工作了几分钟.. 然后下次发布者想要发送消息时它会崩溃并出现以下错误:

pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

我进行了很多故障排除,但无法找到问题并修复它。我读了很多关于心跳的文章,但是我在消费者中的 do_work 函数非常快(不到一秒),所以应该不是问题。发布者,在它开始连接后,它会进入一个无限循环,检查 api 以检测更改,所以也许这就是问题所在? (心跳不在发布者中发送?)。我真的很迷茫,所以任何帮助都会受到欢迎和赞赏。提前致谢!

publisher.py

    class BasicPikaClient:

    def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):

        # SSL Context for TLS configuration of Amazon MQ for RabbitMQ
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')

        url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
        parameters = pika.URLParameters(url)
        parameters.ssl_options = pika.SSLOptions(context=ssl_context)

        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        self.channel.exchange_declare(exchange='bot2', exchange_type='topic', durable=True, auto_delete=False)


class BasicMessageSender(BasicPikaClient):

    def declare_queue(self, queue_name):
        print(f"Trying to declare queue({queue_name})...")
        self.channel.queue_declare(queue=queue_name)

    def send_message(self, exchange, routing_key, body):
        channel = self.connection.channel()
        channel.basic_publish(exchange=exchange,
                              routing_key=routing_key,
                              body=body)
        print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}")

    def close(self):
        self.channel.close()
        self.connection.close()

    def pika_sleep(self,num):
        self.connection.sleep(num)



def check_stuff():
    print("do stuff")
    # here I check some api if I detect changes I send a message:
    if changes:
        basic_message_sender.send_message(exchange="bot2", routing_key="test.test", body=json_data)


def main():
    global basic_message_sender
    # Initialize Basic Message Sender which creates a connection
    # and channel for sending messages.
    basic_message_sender = BasicMessageSender(
        "host",
        "admin",
        "password",
        "region"
    )

    i = 0
    while True:
        time.sleep(0.2)
        check_stuff()

    # Close connections.
    basic_message_sender.close()

if __name__ == "__main__":
    main()

consumer.py

class BasicPikaClient:

    def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):

        # SSL Context for TLS configuration of Amazon MQ for RabbitMQ
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')

        url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
        parameters = pika.URLParameters(url)
        parameters.ssl_options = pika.SSLOptions(context=ssl_context)
        parameters.heartbeat = 0

        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        self.channel.exchange_declare(exchange='bot2', exchange_type='topic', durable=True, auto_delete=False)

        result = self.channel.queue_declare('', exclusive=True)
        self.queue_name = result.method.queue
        self.channel.queue_bind(exchange="bot2", queue=self.queue_name, routing_key="test.test")

def process(json_data):
    print("do GET requests")



class BasicMessageReceiver(BasicPikaClient):

    def get_message(self, queue):
        method_frame, header_frame, body = self.channel.basic_get(queue)
        if method_frame:
            print(method_frame, header_frame, body)
            self.channel.basic_ack(method_frame.delivery_tag)
            return method_frame, header_frame, body
        else:
            print('No message returned')

    def close(self):
        self.channel.close()
        self.connection.close()

    def consume_messages(self):
        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
            process(body)

        self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback, auto_ack=True)

        print(' [*] Waiting for messages. To exit press CTRL+C')
        self.channel.start_consuming()


if __name__ == "__main__":
    # Create Basic Message Receiver which creates a connection
    # and channel for consuming messages.
    basic_message_receiver = BasicMessageReceiver(
        "host",
        "user",
        "password",
        "region"
    )

    # Consume the message that was sent.
    basic_message_receiver.consume_messages()

    # Close connections.
    basic_message_receiver.close()

【问题讨论】:

    标签: rabbitmq amazon-mq


    【解决方案1】:

    不知道您是否设法解决了这个问题,但这可能是超时问题。

    RabbitMQ 的默认值为 30 分钟。

    如果您的代码中有 time.sleep(),它也可能导致超时错误。见thread

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-09
      • 1970-01-01
      • 2012-03-19
      • 1970-01-01
      相关资源
      最近更新 更多