【发布时间】:2019-01-16 14:26:10
【问题描述】:
背景
我需要向大约 100 万台设备发送大量通知,并且我正在使用 Google Cloud Functions 构建它。
在当前设置中,我将每个设备令牌作为 PubSub 消息排入队列:
- 在 DataStore 中存储待处理通知,用于跟踪重试和成功状态
- 尝试发送通知
- 将通知标记为成功或失败(如果重试次数足够多但未通过)
这或多或少都很好,我从中获得了不错的性能,每秒处理 1.5K 个令牌。
问题
我想跟踪整个工作的当前进度。鉴于我知道我希望处理多少个通知,我能够报告类似 x/1_000_000 已处理的内容,然后当失败+成功的总和与我想要处理的一样多时认为它已完成。
DataStore 文档建议不要对实体本身进行计数,因为它不会是高性能的,我可以确认这一点。我在他们的 sharded counter 示例文档之后实现了一个计数器,我将其包含在最后。
我看到的问题是它既很慢又很容易返回409 Contention errors,这使得我的函数调用重试,这并不理想,因为计数本身对进程不是必需的,而且只有一个每个通知的重试预算有限。在实践中,最失败的事情是增加在进程结束时发生的计数器,这会增加通知读取的负载以检查它们在重试时的状态,这意味着我最终得到的计数器小于实际成功的通知.
我使用 wrk 运行了一个快速基准测试,似乎通过增加计数器获得了大约 400 RPS,平均延迟为 250 毫秒。与每个通知执行大约 3 个 DataStore 查询的通知逻辑本身相比,这相当慢,并且可能比递增计数器更复杂。当添加到争用错误中时,我最终会得到一个我认为不稳定的实现。我知道 Datastore 通常会在持续大量使用的情况下自动扩展,但使用此服务的模式非常罕见,而且对于整批令牌而言,因此不会有任何以前的流量来扩展它。
问题
- 我对计数器实现有什么遗漏可以改进以降低速度吗?
- 我应该考虑采用其他方法来获得我想要的东西吗?
代码
与数据存储交互的代码
DATASTORE_READ_BATCH_SIZE = 100
class Counter():
kind = "counter"
shards = 2000
@staticmethod
def _key(namespace, shard):
return hashlib.sha1(":".join([str(namespace), str(shard)]).encode('utf-8')).hexdigest()
@staticmethod
def count(namespace):
keys = []
total = 0
for shard in range(Counter.shards):
if len(keys) == DATASTORE_READ_BATCH_SIZE:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
keys = []
keys.append(client.key(Counter.kind, Counter._key(namespace, shard)))
if len(keys) != 0:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
return total
@staticmethod
def increment(namespace):
key = client.key(Counter.kind, Counter._key(namespace, random.randint(0, Counter.shards - 1)))
with client.transaction():
entity = client.get(key)
if entity is None:
entity = datastore.Entity(key=key)
entity.update({
"count": 0,
})
entity.update({
"count": entity["count"] + 1,
})
client.put(entity)
这是从谷歌云函数中调用的
from flask import abort, jsonify, make_response
from src.notify import FCM, APNS
from src.lib.datastore import Counter
def counter(request):
args = request.args
if args.get("platform"):
Counter.increment(args["platform"])
return
return jsonify({
FCM: Counter.count(FCM),
APNS: Counter.count(APNS)
})
这用于增加和读取计数,并按 iOS 和 Android 平台分开。
【问题讨论】:
标签: python google-cloud-datastore google-cloud-functions