【问题标题】:Google Cloud Pub/Sub - Filter metrics by attributesGoogle Cloud Pub/Sub - 按属性过滤指标
【发布时间】:2022-01-04 11:05:31
【问题描述】:

在使用 GCP Pub/Sub 时,我需要密切关注我的主题并检索未传递消息的数量。它与 Google Query Monitoring 的这个 sn-p 配合得很好:Link

但我需要按属性对消息进行分组。每条消息都有一个带有参数的正文,例如:{'target':'A'},我真的需要这样的东西:

msg.target undelivered messages
A 34
B 42
C 42

如果不使用消息,我无法成功访问它。

这是我的第一次尝试:

import json
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

project_id = "xxxx"
subscription_id = "xxxx"

subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
    request={"subscription": subscription_path,"max_messages":9999}
)

ack_ids=[r.ack_id for r in response.received_messages]

subscriber.modify_ack_deadline(
    request={
        "subscription": subscription_path,
        "ack_ids": ack_ids,
        "ack_deadline_seconds": 0, ## The message will be immediatly 'pullable' again ?
    }
)

messages = [ json.loads(r.message.data.decode()) for r in response.received_messages ]
for m in messages :
    ## Parse all messages to get my needed counts

但它运行得不是很好。我每次都会收到随机数量的消息,因此无法确定我在看什么。

所以我在这里进行实验。

我看到了 3 种方式:

  1. 也许可以直接从 Google Query Monitoring 访问消息正文属性?
  2. 也许我使用/解析/释放所有消息的方法没有正确写入,这就是它无法正常工作的原因?
  3. 也许我都错了,创建多个主题而不是在消息正文中保留属性会更有效,或者还有另一种方法可以“标记”消息以在监控中对它们进行分组?

你知道怎么做吗? 非常感谢您的帮助!

【问题讨论】:

    标签: python google-cloud-platform google-cloud-pubsub google-cloud-monitoring


    【解决方案1】:

    首先要注意的是,未传递消息的数量是订阅的属性,而不是主题。如果同一主题有多个订阅,则未传递的消息数可能不同。 Google Query Monitoring 系统无法按属性分解消息;它没有对消息积压的内容进行任何自省,仅对作为消息数量的元数据进行自省。

    您所拥有的代码有几件事会导致尝试确定剩余消息数时出现问题:

    1. 同步拉取最多只能返回 1000 条消息,因此将 max_messages 设置为 9999 条消息永远不会给您那么多消息。
    2. 即使max_messages 设置为 1000,也不能保证返回 1000 条消息,即使有 1000 条消息尚未送达。您需要发出多个拉取请求才能获取所有消息。当然,由于您取消了消息(通过使用 0 执行 modify_ack_deadline),消息可能会被重新传递,因此会被重复计算。
    3. 即使您执行modify_ack_deadline 请求来nack 消息,但消息在此监视器上未完成,但它们无法传送给您的实际订阅者,这会延迟处理。此外,请考虑您的监视器在执行modify_ack_deadline 之前由于某种原因崩溃的情况。在这种情况下,在您在订阅中配置的确认截止日期过去之前,这些消息不会传递给您的实际订阅者。如果您的应用对延迟敏感,这可能是个问题。

    要考虑的另一种方法是创建第二个订阅并拥有一个接收所有消息的监控应用程序。对于每条消息,它都会查看属性并将其视为该属性的已接收消息,然后确认该消息。您可以通过custom metric 报告每个归因细分的此计数。在您的实际订阅者应用程序中,您还将创建一个自定义指标来计算每个属性接收和处理的消息数量。要计算每个属性要处理的剩余消息数,您可以取这两个数字的差。

    或者,您可以考虑将每个属性的消息分成不同的主题。但是,有几点需要考虑:

    1. 属性集是否已预先确定且已知?如果没有,您的订阅者如何知道要订阅哪些订阅?
    2. 要检查的属性集有多大? 10,000 topics per project 有一个限制,所以如果你有更多的属性,这种方法将不起作用。
    3. 您是否使用flow control 来限制订阅者同时处理的消息数量?如果是这样,每个属性的消息数量是否一致?如果没有,您可能需要考虑如何在不同订阅的订阅者之间划分流量控制。

    【讨论】:

    • 非常感谢这个包含这么多细节的答案。我会记住您关于拉取限制的警告,并尝试您的第二次订阅想法。似乎是一个非常聪明的主意!我会在测试后立即更新这个主题!
    猜你喜欢
    • 2022-01-13
    • 2022-01-09
    • 2021-12-12
    • 1970-01-01
    • 2021-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-13
    相关资源
    最近更新 更多