【问题标题】:Branching Apache Beam pipelines with loops使用循环分支 Apache Beam 管道
【发布时间】:2020-08-24 14:20:11
【问题描述】:

我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织表:

| itemid | class | value |
+--------+-------+-------+
| 1      | A     | 0.2   |       | itemid | value A | value B | value C |
| 1      | B     | 10.3  |  ==>  +--------+---------+---------+---------+
| 2      | A     | 3.0   |  ==>  | 1      |   0.2   |  10.3   |         |
| 2      | B     | 0.2   |  ==>  | 2      |   3.0   |   0.2   |         |
| 3      | A     | 0.0   |       | 3      |   0.0   |   1.2   |   5.4   | 
| 3      | B     | 1.2   |  
| 3      | C     | 5.4   |      

我的方法是执行一个 for 循环,以便按 class 过滤,因为我知道先验的类列表,然后加入生成的 pcollections。

高级代码:

CLASSES = ["A", "B", "C"]

tables = [  
    (
        data
        | "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
    )
    for cin CLASSES
]

和加入:

_ = ( 
    tables
    | "Flatten" >> beam.Flatten()
    | "Join Collections" >> beam.GroupByKey()
    | "Remove key" >> beam.MapTuple(lambda _, val: val)
    | "Merge dicts" >> beam.ParDo(mergeDicts())
    | "Write to GCS" >> beam.io.WriteToText(output_file)
)

with(根据 Peter Kim 的建议进行编辑):

class mergeDicts(beam.DoFn):
    process(self, elements):
        result = {}
        for dictionary in elements:
            if len(dictionary)>0:
                result["itemid"] = dictionary["itemid"]
                result["value {}".format(dictionary["class"])] = dictionary["value"]
        yield result

我的问题是,当管道在 Apache Beam 计算引擎中执行时,我获得了由列表的最后一个元素(在本例中为 C)过滤的相同 pcollections。

[添加] Apache Beam 引擎似乎将迭代变量置于其最终状态,这意味着所有调用分支的迭代列表的最后一个元素。

我显然采用了错误的方法,但是执行此操作的最佳方法应该是什么?

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    您遇到的是关于closures, loops, and Python scoping 的令人惊讶的问题。您可以通过分配变量而不是将其从闭包中拉出来解决此问题。例如

    tables = [  
        (
            data
            # Pass it as a side input to Filter.
            | "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)
            | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
        )
        for c in CLASSES
    ]
    

    tables = [  
        (
            data
            # Explicitly capture it as a default value in the lambda.
            | "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])
            | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
        )
        for c in CLASSES
    ]
    

    分区在这里也很有效,既可以避免这个陷阱,也可以表达你的意图。

    【讨论】:

    • 这是我一直在寻找的答案。非常感谢!
    【解决方案2】:

    根据您显示的结果表,我假设您希望您的输出如下所示:

    {'itemid': '1', 'value B': 10.3, 'value A': 0.2}
    {'itemid': '2', 'value B': 0.2, 'value A': 3.0}
    {'itemid': '3', 'value B': 1.2, 'value A': 0.0, 'value C': 5.4}
    

    您的 mergeDicts 正在覆盖值,因为字典每个键只能保存一个值。将 mergeDicts 更新为类似这样以指定键:

    class mergeDicts(beam.DoFn):
        process(self, elements):
            result = {}
            for dictionary in elements:
                if len(dictionary)>0:
                    result["itemid"] = dictionary["itemid"]
                    result["value {}".format(dictionary["class"])] = dictionary["value"]
            yield result
    

    【讨论】:

    • 谢谢,我把它作为一个虚拟的例子,我忘了包括我在加入之前重命名了字典的字段这一事实。我很抱歉这个错误,但是,问题是即使在加入之前,获得的字典都是相同的。我正在编辑问题以使其更清楚。
    • @Tonca 不用担心。 for 循环肯定可以工作。实际上,我在答案中显示的结果是由您的代码生成的,只有 mergeDicts 和 for class in CLASSES 的变化(因为 class 是保留关键字)。我使用了 apache-beam 2.23。您如何验证获得的 dicts 都是相同的?
    • 谢谢彼得。实际上我的原始用例更复杂(更多字段和更多与其他数据帧的连接),但我确信导致行完全相同的操作是我在问题中描述的操作。显然不是。我正在使用稍微不同的操作(例如 apache_beam.Partition 类的情况)执行许多检查,在某些情况下会产生良好的结果,而在其他情况下会产生错误的结果,但我无法找出问题的原因。跨度>
    • @Tonca 我明白了。我不知道可以做什么,但我很高兴你发现partition 可以达到目的。
    【解决方案3】:

    我在这里发布一个我自己找到的解决方案,但我并没有将其作为正确答案进行检查,因为我想更好地了解 Beam 引擎的执行逻辑。 p>

    我没有通过循环中的项目来过滤表格,而是为了在条件上获得单独的 pcollections,我使用了beam.Partition 类。通过直接应用文档中的代码示例,我将 pcollection 划分为多个表,准备加入。

    这样可以避免问题,但是我不清楚为什么 for 循环不能按预期工作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-11-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-25
      • 2018-01-05
      • 1970-01-01
      相关资源
      最近更新 更多