【发布时间】:2018-09-25 03:00:34
【问题描述】:
我正在使用 Apache Beam 从 Pub/Sub 获取日志,其中包含浏览量信息。每个页面都包含唯一的 ID,当一个页面浏览流量日志来自 Pub/Sub 时,Cloud Dataflow 将以恒定的窗口方式收集它们并计算它们。在 combiner 结束时,我们会得到这样的结果:
12345, 2
12456, 1
15213, 1
...
据我所知,ParDo 是一种用于通用并行处理的 Beam 变换。合并后,我希望实现一个转换,将查询写入 Cloud Firestore 以获取现有的浏览量 ID,获取当前的浏览量,对其执行加法并执行写入操作以从组合输出中一一更新浏览量,如图所示多于。有什么建议吗?
以下是迄今为止我的 UpdateViewCount 代码。当我得到查询时,似乎不可能有一个 for 循环来获取查询(它只会是一行查询,因为页面浏览量是唯一的)
class UpdateIntoFireStore(beam.DoFn):
def process(self, element):
listingid, count = element
doc_ref = db.collection('listings').where('listingid', u'==', '12345')
try:
docs = doc_ref.get()
for doc in docs:
print doc
except NotFound:
print(u'No such document!')
【问题讨论】:
-
澄清:你想循环播放什么?是listingids 还是listingid 的计数或其他?
标签: google-cloud-firestore google-cloud-dataflow apache-beam