【问题标题】:Creating a dictionary type column in dataframe在数据框中创建字典类型列
【发布时间】:2016-07-13 00:45:34
【问题描述】:

考虑以下数据框:

------------+--------------------+
|id|          values
+------------+--------------------+
|          39|a,a,b,b,c,c,c,c,d
|         520|a,b,c
|         832|a,a

我想把它转换成如下DataFrame:

------------+--------------------+
|id|          values
+------------+--------------------+
|          39|{"a":2, "b": 2,"c": 4,"d": 1}
|         520|{"a": 1,"b": 1,"c": 1}
|         832|{"a": 2}

我尝试了两种方法:

  1. 将数据帧转换为 rdd。然后我将值列映射到频率计数器函数。但是我在将 rdd 转换回数据帧时出错

  2. 使用 udf 基本上做与上述相同的事情。

我想要一个字典列的原因是在我的一个 python 应用程序中将它作为 json 加载。

【问题讨论】:

    标签: python pyspark spark-dataframe


    【解决方案1】:

    您可以使用返回 MapType 列的 udf 来执行此操作。

    from pyspark.sql.types import MapType, StringType, IntegerType
    from collections import Counter
    
    my_udf = udf(lambda s: dict(Counter(s.split(','))), MapType(StringType(), IntegerType()))
    df = df.withColumn('values', my_udf('values'))
    df.collect()
    
    [Row(id=39, values={u'a': 2, u'c': 4, u'b': 2, u'd': 1}),
     Row(id=520, values={u'a': 1, u'c': 1, u'b': 1}),
     Row(id=832, values={u'a': 2})]
    

    【讨论】:

      【解决方案2】:

      我无法准确获得您需要的输出,但我真的很接近。这是我能得到的:

      from pyspark.sql.functions import explode, split
      counts = (df.select("id", explode(split("values", ",")).alias("value")).groupby("id", "value").count())
      counts.show()
      

      输出:

      +---+-----+-----+
      | id|value|count|
      +---+-----+-----+
      |520|    a|    1|
      |520|    b|    1|
      |520|    c|    1|
      | 39|    a|    2|
      | 39|    b|    2|
      | 39|    c|    4|
      | 39|    d|    1|
      |832|    a|    2|
      +---+-----+-----+
      

      也许有人可以添加它需要的东西来获得你需要的输出。希望对您有所帮助。

      【讨论】:

        【解决方案3】:

        我最终使用了这个;如果您觉得有更好的方法,请告诉我。

        def split_test(str_in):
          a = str_in.split(',')
          b = {}
          for i in a:
            if i not in b:
              b[i] = 1
            else:
              b[i] += 1
        
          return str(b)
        
        udf_value_count = udf(split_test, StringType() )
        
        value_count_df = value_df.withColumn('value_count', udf_value_count(value_df.values)).drop('values')
        

        【讨论】:

          猜你喜欢
          • 2019-08-06
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-05-08
          • 2020-11-12
          • 2021-08-18
          相关资源
          最近更新 更多