【问题标题】:How to obtain the output of the pipeline and perform read&write to Cloud Firestore如何获取管道的输出并对 Cloud Firestore 进行读写
【发布时间】:2018-09-25 03:00:34
【问题描述】:

我正在使用 Apache Beam 从 Pub/Sub 获取日志,其中包含浏览量信息。每个页面都包含唯一的 ID,当一个页面浏览流量日志来自 Pub/Sub 时,Cloud Dataflow 将以恒定的窗口方式收集它们并计算它们。在 combiner 结束时,我们会得到这样的结果:

12345, 2
12456, 1
15213, 1
...

据我所知,ParDo 是一种用于通用并行处理的 Beam 变换。合并后,我希望实现一个转换,将查询​​写入 Cloud Firestore 以获取现有的浏览量 ID,获取当前的浏览量,对其执行加法并执行写入操作以从组合输出中一一更新浏览量,如图所示多于。有什么建议吗?

以下是迄今为止我的 UpdateViewCount 代码。当我得到查询时,似乎不可能有一个 for 循环来获取查询(它只会是一行查询,因为页面浏览量是唯一的)

class UpdateIntoFireStore(beam.DoFn):
    def process(self, element):
        listingid, count = element
        doc_ref = db.collection('listings').where('listingid', u'==', '12345')
        try:
            docs = doc_ref.get()
            for doc in docs:
                print doc
        except NotFound:
            print(u'No such document!')

【问题讨论】:

  • 澄清:你想循环播放什么?是listingids 还是listingid 的计数或其他?

标签: google-cloud-firestore google-cloud-dataflow apache-beam


【解决方案1】:

我解决了。无需循环来检索数据,我应该检索带有文档名称的特定 ID。

doc_ref = db.collection(u'listings').document(listingid)
try:
    doc = doc_ref.get()
    doc_dict = doc.to_dict()
    self.cur_count = doc_dict[u'count']
    doc_ref.update({
        u'count': self.cur_count + count
    })
except NotFound:
    doc_ref.set({'count': count})

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-11-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-25
    • 2021-05-22
    相关资源
    最近更新 更多