【发布时间】:2020-05-28 00:38:34
【问题描述】:
我有一个管道,它从 pub sub 接收一些数据,进行一些处理,并且需要根据该处理的结果处理 Bigtable 上的所有数据。
例如,我有一条 pub sub 消息,例如:{clientId: 10},所以我需要从 Bigtable 中读取 clientId 10 的所有数据(我知道如何根据 clientId 创建扫描)。问题是我们目前对 Bigtable(BigtableIO 和 CloudBigtableIO)的两个读取都是基于管道以 bigtable 开头的事实,所以我不能(或找不到方法)在中间使用它们管道。我怎样才能实现这种情况?
简单的类伪代码:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?
【问题讨论】:
标签: google-cloud-dataflow apache-beam dataflow google-cloud-bigtable