【问题标题】:Apache Beam - JSON groupingApache Beam - JSON 分组
【发布时间】:2021-11-10 23:12:43
【问题描述】:

我是 python3 中 apache beam 的新手,我必须用它构建某些管道,而且我还有最后一步,我不知道如何执行。

我已经转换并清理了每行的 JSON 元素,我希望它们按键分组,并且我希望将所有元素存储在其中(其余元素将被删除)。

例如线条

{"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True}
{"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}

转换为单个 JSON 对象:

{"Mark" : [{"age":23, "transaction_no": "001", "price":59.99}, {"age":23, "transaction_no": "002", "price":10.00}

列表中的元素只会是我选择的那些(例如,一些标志被丢弃)

在 Apache Beam 中进行此类分组最有效的方法是什么?

感谢任何帮助!!!

【问题讨论】:

    标签: python json python-3.x etl apache-beam


    【解决方案1】:

    这是您需要的示例代码。根据我的理解,您只需要映射器每次都返回 json 而不是字典列表。

    所以在这种情况下,当您只需要一个 json 时,您可以编写如下映射器。

    import apache_beam as beam
    
    
    def  map_as_json(item,key_col,cols_to_exclude):
        row = {
            item[key_col]: [
            {
                key: val for  key,val in item.items() if key not in cols_to_exclude and key not in key_col
            }
        ] 
        }
        return row
        
    with beam.Pipeline() as p:
        group_stocks_by_date_name = (
            p
            | 'create'>>beam.Create(
                [
                    {"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True},
                    {"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}
                ]
            )
            | 'selective details'>> beam.Map(map_as_json,key_col='Name',cols_to_exclude=['someflag'])
            | 'print'>>beam.Map(print)
        )
    

    如果有帮助,请标记为答案。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-04
      • 1970-01-01
      • 2018-08-28
      • 2023-04-01
      相关资源
      最近更新 更多