【发布时间】:2021-12-02 00:04:45
【问题描述】:
我在一个项目中工作是为了好玩,但过去两周我一直在同一个问题上卡住。场景如下:
- 一个消费者。每次收到消息时,它都会向 API 发送一个 GET 请求。此请求需要不到一秒钟的时间。
- 出版商。此脚本在循环中寻找更改,当它检测到更改时它会发送并向消费者发送消息。
问题:这工作了几分钟.. 然后下次发布者想要发送消息时它会崩溃并出现以下错误:
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
我进行了很多故障排除,但无法找到问题并修复它。我读了很多关于心跳的文章,但是我在消费者中的 do_work 函数非常快(不到一秒),所以应该不是问题。发布者,在它开始连接后,它会进入一个无限循环,检查 api 以检测更改,所以也许这就是问题所在? (心跳不在发布者中发送?)。我真的很迷茫,所以任何帮助都会受到欢迎和赞赏。提前致谢!
publisher.py
class BasicPikaClient:
def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):
# SSL Context for TLS configuration of Amazon MQ for RabbitMQ
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')
url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
parameters = pika.URLParameters(url)
parameters.ssl_options = pika.SSLOptions(context=ssl_context)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='bot2', exchange_type='topic', durable=True, auto_delete=False)
class BasicMessageSender(BasicPikaClient):
def declare_queue(self, queue_name):
print(f"Trying to declare queue({queue_name})...")
self.channel.queue_declare(queue=queue_name)
def send_message(self, exchange, routing_key, body):
channel = self.connection.channel()
channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=body)
print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}")
def close(self):
self.channel.close()
self.connection.close()
def pika_sleep(self,num):
self.connection.sleep(num)
def check_stuff():
print("do stuff")
# here I check some api if I detect changes I send a message:
if changes:
basic_message_sender.send_message(exchange="bot2", routing_key="test.test", body=json_data)
def main():
global basic_message_sender
# Initialize Basic Message Sender which creates a connection
# and channel for sending messages.
basic_message_sender = BasicMessageSender(
"host",
"admin",
"password",
"region"
)
i = 0
while True:
time.sleep(0.2)
check_stuff()
# Close connections.
basic_message_sender.close()
if __name__ == "__main__":
main()
consumer.py
class BasicPikaClient:
def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):
# SSL Context for TLS configuration of Amazon MQ for RabbitMQ
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')
url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
parameters = pika.URLParameters(url)
parameters.ssl_options = pika.SSLOptions(context=ssl_context)
parameters.heartbeat = 0
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='bot2', exchange_type='topic', durable=True, auto_delete=False)
result = self.channel.queue_declare('', exclusive=True)
self.queue_name = result.method.queue
self.channel.queue_bind(exchange="bot2", queue=self.queue_name, routing_key="test.test")
def process(json_data):
print("do GET requests")
class BasicMessageReceiver(BasicPikaClient):
def get_message(self, queue):
method_frame, header_frame, body = self.channel.basic_get(queue)
if method_frame:
print(method_frame, header_frame, body)
self.channel.basic_ack(method_frame.delivery_tag)
return method_frame, header_frame, body
else:
print('No message returned')
def close(self):
self.channel.close()
self.connection.close()
def consume_messages(self):
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
process(body)
self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
self.channel.start_consuming()
if __name__ == "__main__":
# Create Basic Message Receiver which creates a connection
# and channel for consuming messages.
basic_message_receiver = BasicMessageReceiver(
"host",
"user",
"password",
"region"
)
# Consume the message that was sent.
basic_message_receiver.consume_messages()
# Close connections.
basic_message_receiver.close()
【问题讨论】: