【问题标题】:RabbitMQ pika.exceptions.ConnectionClosed (-1, "error(104, 'Connection reset by peer')")RabbitMQ pika.exceptions.ConnectionClosed (-1, "error(104, 'Connection reset by peer')")
【发布时间】:2019-03-29 02:53:18
【问题描述】:

我在 RabbitMQ 中有一个任务队列,其中有多个生产者 (12) 和一个消费者,用于 webapp 中的繁重任务。当我运行消费者时,它开始将一些消息出列,然后崩溃并出现此错误:

Traceback (most recent call last):
File "jobs.py", line 42, in <module> jobs[job](config)
File "/home/ec2-user/project/queue.py", line 100, in init_queue
channel.start_consuming()
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
self._flush_output(common_terminator)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")

生产者代码是:

message = {'image_url': image_url, 'image_name': image_name, 'notes': notes}

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks_queue')
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))

connection.close()

而唯一的消费者代码(一个正在发生冲突):

def callback(self, ch, method, properties, body):
    """Callback when receive a message."""
    message = json.loads(body)
    try:
        image = _get_image(message['image_url'])
    except:
        sys.stderr.write('Error getting image in note %s' % note['id'])
   # Crop image with PIL. Not so expensive
   box_path = _crop(image, message['image_name'], box)

   # API call. Long time function
   result = long_api_call(box_path)

   if result is None:
       sys.stderr.write('Error in note %s' % note['id'])
       return
   # update the db
   db.update_record(result)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_obj.callback, queue='tasks_queue', no_ack=True)
channel.start_consuming()

如您所见,消息有 3 个昂贵的函数。一项裁剪任务、一项 API 调用和一项数据库更新。没有 API 调用,que consumer 运行流畅。

提前致谢

【问题讨论】:

  • 请提供有关您的环境的信息 - 您正在使用什么版本的软件,您使用的是 Docker,您使用的是负载均衡器,RabbitMQ 是否记录了任何内容。 Connection reset by peer 表示某些东西意外中断了您的 TCP 连接。我希望看到 RabbitMQ 记录的类似消息。
  • 你好。我在 Amazon Linux EC2 实例上运行 rabbitmq 3.7.0。没有 docker 或负载均衡器。另外,这段代码。 result = long_api_call(box_path) 在 try catch 块后面,因此应该是容错的。此 long_api_call 指向当前 Internet 连接不稳定的外部服务,因此某些回调调用不起作用并不罕见。但是这个错误不应该因为这个奇怪的错误而让消费者失望。我的 rabbitmq 日志文件:
  • 2018-10-25 06:04:54.854 [info] 关闭 AMQP 连接 (127.0.0.1:42882 -> 127.0.0.1:5672, vhost: '/', user: 'guest') 2018-10-25 06:05:14.740 [警告] 关闭 AMQP 连接 (127.0.0.1:32816 -> 127.0.0.1:5672) :错过来自客户端的心跳,超时:60 秒 2018-10-25 06:06:59.367 [信息] 接受 AMQP 连接 (127.0.0.1:43332 -> 127.0.0.1:5672) 2018 -10-25 06:06:59.370 [信息] 连接 (127.0.0.1:43332 -> 127.0.0.1:5672): 用户 'guest' 已通过身份验证和授予

标签: python python-2.7 rabbitmq pika python-pika


【解决方案1】:

从 RabbitMQ 3.5.5 开始,broker 的默认心跳超时 从 580 秒减少到 60 秒。

pika: Ensuring well-behaved connection with heartbeat and blocked-connection timeouts

最简单的解决方法是增加心跳超时:

rabbit_url = host + "?heartbeat=360"
conn = pika.BlockingConnection(pika.URLParameters(rabbit_url))

# or

params = pika.ConnectionParameters(host, heartbeat=360)
conn = pika.BlockingConnection(params)

【讨论】:

    【解决方案2】:

    您的 RabbitMQ 日志显示了一条我认为我们可能会看到的消息:

    missed heartbeats from client, timeout: 60s
    

    发生的事情是您的long_api_call 阻塞了 Pika 的 I/O 循环。 Pika 是一个非常轻量级 的库,并且不会在后台为您启动线程,因此您必须以这样的方式进行编码,以免阻塞 Pika 的 I/O 循环超过心跳间隔。 RabbitMQ 认为你的客户端已经死机或无响应,强行关闭连接。

    请参阅链接到this example codemy answer here,其中显示了如何在单独的线程中正确执行长时间运行的任务。你仍然可以使用no_ack=True,只是跳过ack_message 调用。


    注意:RabbitMQ 团队监控the rabbitmq-users mailing list,并且有时只回答 StackOverflow 上的问题。

    【讨论】:

    • 你是救生员。这在过去 3 天里一直困扰着我,现在它可以工作了。谢谢!
    猜你喜欢
    • 1970-01-01
    • 2016-04-18
    • 2019-02-02
    • 2013-03-13
    • 2019-04-30
    • 2016-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多