【发布时间】:2020-05-25 11:06:00
【问题描述】:
我正在浏览 PubSub pull 文档here
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
project_id, subscription_name
)
def callback(message):
print("Received message: {}".format(message))
message.ack()
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
在上面的例子中,消息一收到就被确认。但我只想在我的本地 celery 工作人员完成处理消息时确认,以便 PubSub 可以在工作人员失败时重新传递消息。所以我把消息的ack_id,传给worker。
params["ack_id"] = message._ack_id
start_aggregation.delay(params)
我只是不知道如何使用工作人员中的 ack_id 来确认消息。我知道您可以使用 pubsub 端点来确认给定here 之类的消息。但我不知道如何使用服务帐户凭据来做同样的事情——他们在那个文档中使用 OAuth 来做。任何指针表示赞赏。谢谢。
【问题讨论】:
标签: python google-cloud-platform publish-subscribe google-cloud-pubsub