【问题标题】:How to Acknowledge a Google PubSub message using AckID in Python如何在 Python 中使用 AckID 确认 Google PubSub 消息
【发布时间】:2020-05-25 11:06:00
【问题描述】:

我正在浏览 PubSub pull 文档here

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0  # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
    streaming_pull_future.result(timeout=timeout)
except:  # noqa
    streaming_pull_future.cancel()

在上面的例子中,消息一收到就被确认。但我只想在我的本地 celery 工作人员完成处理消息时确认,以便 PubSub 可以在工作人员失败时重新传递消息。所以我把消息的ack_id,传给worker。

params["ack_id"] = message._ack_id
start_aggregation.delay(params)

我只是不知道如何使用工作人员中的 ack_id 来确认消息。我知道您可以使用 pubsub 端点来确认给定here 之类的消息。但我不知道如何使用服务帐户凭据来做同样的事情——他们在那个文档中使用 OAuth 来做。任何指针表示赞赏。谢谢。

【问题讨论】:

    标签: python google-cloud-platform publish-subscribe google-cloud-pubsub


    【解决方案1】:

    通过直接调用acknowledge API 确认从客户端库接收到的消息会导致客户端出现问题。客户端有flow control limits,它决定了可以未完成(已交付,但未确认)的最大消息数。当有人调用message.ack()message.nack() 时,就会从计数中删除消息。如果您直接调用acknowledge API,那么这个计数不会改变,导致一旦达到限制消息就不再流动。

    如果您尝试使用 celery 在处理中获得更多并行性,您可以直接执行此操作而无需此中间步骤。一种选择是在不同进程中启动具有相同订阅的订阅客户端实例。消息将在订阅者之间分发。或者,您可以将scheduler 替换为基于进程而不是基于线程的,尽管这需要更多的工作。

    【讨论】:

    • 谢谢,我觉得自己像个白痴。删除了双重排队,现在工作得更好了。
    • 嗨!我有同样的问题,在我看来,解决方案有利于更好的并行性,但不适用于在处理(事件后消费)失败时重新传递消息..
    猜你喜欢
    • 2021-06-18
    • 2019-07-02
    • 1970-01-01
    • 2018-06-22
    • 2020-08-08
    • 2020-05-05
    • 2018-01-01
    • 2021-10-17
    • 2022-11-16
    相关资源
    最近更新 更多