【发布时间】:2019-10-05 06:33:55
【问题描述】:
我正在尝试在 apache Beam 中创建一个有状态的 ParDo,它存储值的字典并使用来自后续窗口的数据更新该字典。 等价于 java 中的 MapState。 我尝试使用自定义的 CombineFn 来实现它
class DictCombineFn(beam.CombineFn):
def create_accumulator(self):
return {}
def add_input(self, accumulator, element):
accumulator[element["key"]] = element["value"]
return accumulator
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, accumulator):
return accumulator
在以下 ParDo 的 CombiningValueStateSpec 中使用:
class EnrichDoFn(beam.DoFn):
DICT_STATE = CombiningValueStateSpec(
'dict',
PickleCoder(),
DictCombineFn()
)
def process(
self,
element,
w=beam.DoFn.WindowParam,
dict_state=beam.DoFn.StateParam(DICT_STATE)
):
asks_state.add(element)
但是我在 :
期间收到以下错误:TypeError: '_ConcatIterable' 对象不支持项目分配
我认为这可能是因为使用了错误的编码器? 实现上述逻辑的最佳策略是什么?
谢谢
【问题讨论】:
标签: python streaming google-cloud-dataflow apache-beam stateful