【发布时间】:2022-12-30 04:41:55
【问题描述】:
我正在尝试从 Pubsub 读取 Json 对象,并将其转换为 Python Dict 后,如果我尝试在我的有状态 DoFn 中传递 Dict,我会遇到此错误。 '有状态的 DoFn 或 KeyParam 的输入值必须是 KV 元组; ' 为什么需要为有状态的 DoFn 为我现有的字典分配一个密钥?
【问题讨论】:
标签: google-cloud-dataflow apache-beam
我正在尝试从 Pubsub 读取 Json 对象,并将其转换为 Python Dict 后,如果我尝试在我的有状态 DoFn 中传递 Dict,我会遇到此错误。 '有状态的 DoFn 或 KeyParam 的输入值必须是 KV 元组; ' 为什么需要为有状态的 DoFn 为我现有的字典分配一个密钥?
【问题讨论】:
标签: google-cloud-dataflow apache-beam
我不确定我是否完全理解你的问题,但我最近在 KV 对象方面有过类似的经历(也是从 PubSub 工作!)所以我会尝试回答你的问题......也许你 - 或其他人 - 会发现它有用! ;)
Stateful DoFn
有状态的 DoFn 必须使用特定的数据结构:
KVs 是 Java data structures 已使用“键值对集合”在 Python 中实现(即 2 个元素的元组集合,其中一个元素扮演键的角色,另一个扮演值的角色)。
所以如果你想使用 Stateful DoFn 和 dict... 你将不得不翻译它。给你一个翻译器的例子(和反向操作):
from typing import Dict, Any, Tuple
from functools import reduce
def kv_tpl2dic(tpl: Tuple[Tuple[str, Any]]):
return reduce(lambda prev, tpl: {**prev, tpl[0]: tpl[1]}, tpl, {})
# @beam.typehints.with_input_types(Optional[Dict[str, Any]])
def dic2kv_tpl(dic: Dict[str, Dict[str, Any]]):
return tuple(map(lambda key: (key, dic[key]), dic.keys()))
最后,如果这是你的问题,你为什么需要这样做......
最后但并非最不重要的是,请注意,根据我的经验,您甚至不能将字典放入 KV 实例中!
#list of KVs without JSON values: OK
input_good = [(("json", "val"), ("table_name", "one")),
(("json", "val"), ("table_name", "two"))]
#list of KVs with JSON values: KO!
input_bad = [(("json", {"test":"test"}), ("table_name", "one")),
(("json", {"test":"test"}), ("table_name", "two"))]
我建议您将 PubSub 消息保存为字符串或字节,然后再进行翻译。
【讨论】: