【发布时间】:2020-02-14 16:58:17
【问题描述】:
我是 Apache Beam/数据流的新手。我正在阅读 Apache Beam 中的 BigQuery 表,我想按两个不同的列进行分组,并比较两个不同键的所有值。我创建了一个包含两个不同列(ID、Date)的元组,它们充当键。以下是表格中的示例数据
ID Date P_id position
"abc" 2019-08-01 "rt56" 5
"abc" 2019-08-01 "rt57" 6
"abc" 2019-08-01 "rt58" 7
"abc" 2019-08-02 "rt56" 2
"abc" 2019-08-02 "rt57" 4
"abc" 2019-08-02 "rt58" 7
现在我想比较 P_id 对 ("abc", 2019-08-01) 和 ("abc", 2019-08-02) 的位置,看看是否有任何 P_id 位置发生变化然后添加表“状态”中的另一列为 True。所以我的新表应该如下所示
我正在用下面的代码尝试它
ID Date P_id position Status
"abc" 2019-08-01 "rt56" 5 False (as this is first date)
"abc" 2019-08-01 "rt57" 6
"abc" 2019-08-01 "rt58" 7
"abc" 2019-08-02 "rt56" 2 True
"abc" 2019-08-02 "rt57" 4
"abc" 2019-08-02 "rt58" 7
(
p
| "get_key_tuple" >> beam.ParDo(lambda element: tuple(element["Id"], element["Date]))
| "group_by" >> beam.GroupByKey()
| "compare_and_add_status" >> beam.ParDo(compare_pos)
)
但是我不知道我应该如何处理函数 compare_pos()
考虑到我有一个非常大的表和很多 ID,如果我能有效地比较位置并创建一个新列来了解状态,这将非常有帮助。
【问题讨论】:
标签: python google-cloud-dataflow apache-beam