【问题标题】:Google Cloud Datastore: Counting all entities in a certain stateGoogle Cloud Datastore:计算处于特定状态的所有实体
【发布时间】:2019-01-16 14:26:10
【问题描述】:

背景

我需要向大约 100 万台设备发送大量通知,并且我正在使用 Google Cloud Functions 构建它。

在当前设置中,我将每个设备令牌作为 PubSub 消息排入队列:

  • 在 DataStore 中存储待处理通知,用于跟踪重试和成功状态
  • 尝试发送通知
  • 将通知标记为成功或失败(如果重试次数足够多但未通过)

这或多或少都很好,我从中获得了不错的性能,每秒处理 1.5K 个令牌。

问题

我想跟踪整个工作的当前进度。鉴于我知道我希望处理多少个通知,我能够报告类似 x/1_000_000 已处理的内容,然后当失败+成功的总和与我想要处理的一样多时认为它已完成。

DataStore 文档建议不要对实体本身进行计数,因为它不会是高性能的,我可以确认这一点。我在他们的 sharded counter 示例文档之后实现了一个计数器,我将其包含在最后。

我看到的问题是它既很慢又很容易返回409 Contention errors,这使得我的函数调用重试,这并不理想,因为计数本身对进程不是必需的,而且只有一个每个通知的重试预算有限。在实践中,最失败的事情是增加在进程结束时发生的计数器,这会增加通知读取的负载以检查它们在重试时的状态,这意味着我最终得到的计数器小于实际成功的通知.

我使用 wrk 运行了一个快速基准测试,似乎通过增加计数器获得了大约 400 RPS,平均延迟为 250 毫秒。与每个通知执行大约 3 个 DataStore 查询的通知逻辑本身相比,这相当慢,并且可能比递增计数器更复杂。当添加到争用错误中时,我最终会得到一个我认为不稳定的实现。我知道 Datastore 通常会在持续大量使用的情况下自动扩展,但使用此服务的模式非常罕见,而且对于整批令牌而言,因此不会有任何以前的流量来扩展它。

问题

  • 我对计数器实现有什么遗漏可以改进以降低速度吗?
  • 我应该考虑采用其他方法来获得我想要的东西吗?

代码

与数据存储交互的代码

DATASTORE_READ_BATCH_SIZE = 100

class Counter():
    kind = "counter"
    shards = 2000

    @staticmethod
    def _key(namespace, shard):
        return hashlib.sha1(":".join([str(namespace), str(shard)]).encode('utf-8')).hexdigest()

    @staticmethod
    def count(namespace):
        keys = []
        total = 0
        for shard in range(Counter.shards):
            if len(keys) == DATASTORE_READ_BATCH_SIZE:
                counters = client.get_multi(keys)
                total = total + sum([int(c["count"]) for c in counters])
                keys = []
            keys.append(client.key(Counter.kind, Counter._key(namespace, shard)))

        if len(keys) != 0:
            counters = client.get_multi(keys)
            total = total + sum([int(c["count"]) for c in counters])

        return total

    @staticmethod
    def increment(namespace):
        key = client.key(Counter.kind, Counter._key(namespace, random.randint(0, Counter.shards - 1)))
        with client.transaction():
            entity = client.get(key)
            if entity is None:
                entity = datastore.Entity(key=key)
                entity.update({
                    "count": 0,
                })
            entity.update({
                "count": entity["count"] + 1,
            })
            client.put(entity)

这是从谷歌云函数中调用的

from flask import abort, jsonify, make_response
from src.notify import FCM, APNS
from src.lib.datastore import Counter

def counter(request):
    args = request.args

    if args.get("platform"):
        Counter.increment(args["platform"])
        return

    return jsonify({
        FCM: Counter.count(FCM),
        APNS: Counter.count(APNS)
    })

这用于增加和读取计数,并按 iOS 和 Android 平台分开。

【问题讨论】:

    标签: python google-cloud-datastore google-cloud-functions


    【解决方案1】:

    最后我放弃了柜台,开始在 BigQuery 中保存通知的状态。定价仍然是合理的,因为它仍然是按次使用的,而且数据插入的流版本似乎足够快,在实践中不会给我带来任何问题。

    有了这个,我可以使用一个简单的 sql 查询来计算与批处理作业匹配的所有实体。对于所有实体,这最终需要大约 3 秒的时间,与替代方案相比,这对我来说是可以接受的性能,因为这仅供内部使用。

    【讨论】:

      猜你喜欢
      • 2021-04-05
      • 1970-01-01
      • 2018-09-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多