【问题标题】:Apache Beam Python: How to implement MapState in the python apache beam sdk?Apache Beam Python:如何在 python apache Beam sdk 中实现 MapState?
【发布时间】:2019-10-05 06:33:55
【问题描述】:

我正在尝试在 apache Beam 中创建一个有状态的 ParDo,它存储值的字典并使用来自后续窗口的数据更新该字典。 等价于 java 中的 MapState。 我尝试使用自定义的 CombineFn 来实现它

class DictCombineFn(beam.CombineFn):
    def create_accumulator(self):
        return {}

    def add_input(self, accumulator, element):
        accumulator[element["key"]] = element["value"]
        return accumulator

    def merge_accumulators(self, accumulators):
        return accumulators

    def extract_output(self, accumulator):
        return accumulator

在以下 ParDo 的 CombiningValueStateSpec 中使用:

class EnrichDoFn(beam.DoFn):

    DICT_STATE = CombiningValueStateSpec(
        'dict', 
        PickleCoder(), 
        DictCombineFn()
    )

    def process(
              self, 
              element,
              w=beam.DoFn.WindowParam,
              dict_state=beam.DoFn.StateParam(DICT_STATE)
        ):
           asks_state.add(element)

但是我在 :

期间收到以下错误:

TypeError: '_ConcatIterable' 对象不支持项目分配

我认为这可能是因为使用了错误的编码器? 实现上述逻辑的最佳策略是什么?

谢谢

【问题讨论】:

    标签: python streaming google-cloud-dataflow apache-beam stateful


    【解决方案1】:

    我不是 100% 确定这个错误意味着什么,但感觉在这个特定的过程中不支持 dict 类型。您是否尝试获取字符串列表,即“key:value”,然后一次性解析并将其转换为字典?

    【讨论】:

    • 使用 cmets 提出这样的问题; )
    【解决方案2】:

    合并累加器应返回一个元素,而不是像您的情况那样可迭代。您可以像添加元素一样进行类似的处理。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多