【问题标题】:Why does StateFul DoFn require KV tuple in python?为什么 StateFul DoFn 需要 python 中的 KV 元组?
【发布时间】:2022-12-30 04:41:55
【问题描述】:

我正在尝试从 Pubsub 读取 Json 对象,并将其转换为 Python Dict 后,如果我尝试在我的有状态 DoFn 中传递 Dict,我会遇到此错误。 '有状态的 DoFn 或 KeyParam 的输入值必须是 KV 元组; ' 为什么需要为有状态的 DoFn 为我现有的字典分配一个密钥?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    我不确定我是否完全理解你的问题,但我最近在 KV 对象方面有过类似的经历(也是从 PubSub 工作!)所以我会尝试回答你的问题......也许你 - 或其他人 - 会发现它有用! ;)

    Apache Beam 中的输入元素Stateful DoFn

    有状态的 DoFn 必须使用特定的数据结构:

    • 您可以使用简单类型(整数、字符串、“简单”列表...)
    • 当您必须使用更复杂的数据结构(例如可以从 PubSub 消息中获取的嵌套 json/dict)时,您必须实现它 KV数据结构。

    什么是 KV 元组?

    KVsJava data structures 已使用“键值对集合”在 Python 中实现(即 2 个元素的元组集合,其中一个元素扮演键的角色,另一个扮演值的角色)。 所以如果你想使用 Stateful DoFndict... 你将不得不翻译它。给你一个翻译器的例子(和反向操作):

    from typing import Dict, Any, Tuple
    from functools import reduce
    
    def kv_tpl2dic(tpl: Tuple[Tuple[str, Any]]):
        return reduce(lambda prev, tpl: {**prev, tpl[0]: tpl[1]}, tpl, {})
    
    # @beam.typehints.with_input_types(Optional[Dict[str, Any]])
    def dic2kv_tpl(dic: Dict[str, Dict[str, Any]]):
        return tuple(map(lambda key: (key, dic[key]), dic.keys()))
    

    最后,如果这是你的问题,你为什么需要这样做......

    • 一点逻辑:实现的某些部分需要一个不可变的键/值对。
    • 有点历史意外:整个机制首先是用 Java 实现的,他们需要将那些 Java KV 类转换为 Python 数据结构。使用仅为其键提供不变性的字典可能也可能使用过,但它会使 Jave 代码的翻译更加困难......但这只是我的猜测。

    最后但并非最不重要的是,请注意,根据我的经验,您甚至不能将字典放入 KV 实例中!

    #list of KVs without JSON values: OK
    input_good = [(("json", "val"), ("table_name", "one")),
                  (("json", "val"), ("table_name", "two"))]
    
    #list of KVs with JSON values: KO!
    input_bad = [(("json", {"test":"test"}), ("table_name", "one")),
                 (("json", {"test":"test"}), ("table_name", "two"))]
    

    我建议您将 PubSub 消息保存为字符串或字节,然后再进行翻译。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-03-16
      • 1970-01-01
      • 1970-01-01
      • 2013-01-16
      • 2015-07-04
      • 2011-02-13
      • 2020-11-06
      相关资源
      最近更新 更多