【发布时间】:2021-09-14 11:14:03
【问题描述】:
我有一个云功能,它向 PubSub 发布消息并触发云运行以执行存档文件过程。当有大文件时,我的云运行 python 代码需要一些时间来处理数据,看起来 PubSub 在 20 秒(默认确认截止时间)后重试消息,这会从我的云运行触发另一个实例。我已将确认截止日期增加到 600 秒并重新部署了所有内容,但它仍在 20 秒后重试消息。我错过了什么?
云函数发布消息代码:
# Publishes a message
try:
publish_future = publisher.publish(topic_path, data=message_bytes)
publish_future.result() # Verify the publish succeeded
return 'Message published.'
except Exception as e:
print(e)
return (e, 500)
云运行代码:
@app.route("/", methods=["POST"])
def index():
envelope = request.get_json()
if not envelope:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(envelope, dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = envelope["message"]
if isinstance(pubsub_message, dict) and "data" in pubsub_message:
#Decode base64 event['data']
event_data = base64.b64decode(pubsub_message['data']).decode('utf-8')
message = json.loads(event_data)
#logic to process data/archive
return ("", 204)
【问题讨论】:
-
您能分享一下您在 Cloud Run 中是如何处理 PubSub 消息的吗?
-
嗨@guillaume,我已经编辑了我的主要帖子。这是一个非常基本的 Flask 应用程序,它将接受请求并在完成后返回 204(与 Google 文档相同的示例)。谢谢!
-
您确定来自您的 PubSub 推送订阅吗?您是否有多个订阅向您的 Cloud Run 服务发送消息?您是否也可以尝试清除订阅并重试新消息?你确定这是同一条信息吗?
-
是的,它来自 pubsub 但拉订阅。我从变量 request.get_json() 打印了请求,我看到消息进来了两次(相同的 messageId)。不过有一件事,我只是将重试策略更改为 60 秒分钟后退,我不再看到重复的消息,但如果它需要超过 60 秒,它可能会发生。
-
您谈论的是拉取,但您的屏幕截图是推送订阅。你把我弄丢了!!
标签: google-cloud-platform google-cloud-pubsub google-cloud-run