【发布时间】:2020-08-24 14:20:11
【问题描述】:
我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织表:
| itemid | class | value |
+--------+-------+-------+
| 1 | A | 0.2 | | itemid | value A | value B | value C |
| 1 | B | 10.3 | ==> +--------+---------+---------+---------+
| 2 | A | 3.0 | ==> | 1 | 0.2 | 10.3 | |
| 2 | B | 0.2 | ==> | 2 | 3.0 | 0.2 | |
| 3 | A | 0.0 | | 3 | 0.0 | 1.2 | 5.4 |
| 3 | B | 1.2 |
| 3 | C | 5.4 |
我的方法是执行一个 for 循环,以便按 class 过滤,因为我知道先验的类列表,然后加入生成的 pcollections。
高级代码:
CLASSES = ["A", "B", "C"]
tables = [
(
data
| "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for cin CLASSES
]
和加入:
_ = (
tables
| "Flatten" >> beam.Flatten()
| "Join Collections" >> beam.GroupByKey()
| "Remove key" >> beam.MapTuple(lambda _, val: val)
| "Merge dicts" >> beam.ParDo(mergeDicts())
| "Write to GCS" >> beam.io.WriteToText(output_file)
)
with(根据 Peter Kim 的建议进行编辑):
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
我的问题是,当管道在 Apache Beam 计算引擎中执行时,我获得了由列表的最后一个元素(在本例中为 C)过滤的相同 pcollections。
[添加] Apache Beam 引擎似乎将迭代变量置于其最终状态,这意味着所有调用分支的迭代列表的最后一个元素。
我显然采用了错误的方法,但是执行此操作的最佳方法应该是什么?
【问题讨论】:
标签: python google-cloud-dataflow apache-beam