【问题标题】:PubSub acknowledge deadlinePubSub 确认截止日期
【发布时间】:2021-09-14 11:14:03
【问题描述】:

我有一个云功能,它向 PubSub 发布消息并触发云运行以执行存档文件过程。当有大文件时,我的云运行 python 代码需要一些时间来处理数据,看起来 PubSub 在 20 秒(默认确认截止时间)后重试消息,这会从我的云运行触发另一个实例。我已将确认截止日期增加到 600 秒并重新部署了所有内容,但它仍在 20 秒后重试消息。我错过了什么?

云函数发布消息代码:

# Publishes a message
   try:
      publish_future = publisher.publish(topic_path, data=message_bytes)
      publish_future.result()  # Verify the publish succeeded
      return 'Message published.'
   except Exception as e:
      print(e)
      return (e, 500)

这是 PubSub 订阅配置:

显示第二个实例在 20 秒后触发的日志记录:

云运行代码:

@app.route("/", methods=["POST"])
def index():
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400        

    if not isinstance(envelope, dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = envelope["message"]

    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        #Decode base64 event['data']
        event_data = base64.b64decode(pubsub_message['data']).decode('utf-8')
        message = json.loads(event_data)

        #logic to process data/archive
        return ("", 204)



【问题讨论】:

  • 您能分享一下您在 Cloud Run 中是如何处理 PubSub 消息的吗?
  • 嗨@guillaume,我已经编辑了我的主要帖子。这是一个非常基本的 Flask 应用程序,它将接受请求并在完成后返回 204(与 Google 文档相同的示例)。谢谢!
  • 您确定来自您的 PubSub 推送订阅吗?您是否有多个订阅向您的 Cloud Run 服务发送消息?您是否也可以尝试清除订阅并重试新消息?你确定这是同一条信息吗?
  • 是的,它来自 pubsub 但拉订阅。我从变量 request.get_json() 打印了请求,我看到消息进来了两次(相同的 messageId)。不过有一件事,我只是将重试策略更改为 60 秒分钟后退,我不再看到重复的消息,但如果它需要超过 60 秒,它可能会发生。
  • 您谈论的是拉取,但您的屏幕截图是推送订阅。你把我弄丢了!!

标签: google-cloud-platform google-cloud-pubsub google-cloud-run


【解决方案1】:

您应该能够通过设置 minimumBackoff retrypolicy 来控制重试。您可以将 minimumBackoff 时间设置为最大 600 秒,例如您的 ack 截止日期,这样重新传递的消息将超过 600 秒。这应该会减少您看到的出现次数。

为了处理重复,建议使您的订阅者具有幂等性。您需要应用某种代码检查来查看 messageId 之前是否被处理过。

您可以在以下文档中找到 at-least-once-delivery

通常,Pub/Sub 会按照发布的顺序交付每条消息一次。但是,有时可能会乱序或多次传递消息。通常,容纳多次传递要求您的订阅者在处理消息时是幂等的。您可以使用 Apache Beam 编程模型实现对 Pub/Sub 消息流的一次性处理。 Apache Beam I/O 连接器让您可以通过受控的源和接收器与 Cloud Dataflow 进行交互。您可以使用 Apache Beam PubSubIO 连接器(适用于 Java 和 Python)从 Cloud Pub/Sub 中读取数据。您还可以通过 Cloud Dataflow 使用服务的标准排序 API 来实现有序处理。或者,为了实现排序,您订阅的主题的发布者可以在消息中包含一个序列令牌。

【讨论】:

    猜你喜欢
    • 2021-04-20
    • 2017-07-07
    • 2018-03-10
    • 2017-06-22
    • 2019-05-01
    • 1970-01-01
    • 2014-11-20
    • 2019-10-14
    • 2012-12-04
    相关资源
    最近更新 更多