【问题标题】:unsure why pyspark is treating my list as string不确定为什么 pyspark 将我的列表视为字符串
【发布时间】:2021-05-06 04:33:19
【问题描述】:

我在下面有一个df

| year | id  | area | visitor
| 2007 | 001 | GFD  | [{'id':'AA1' 'age':20}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':4}]
| 2009 | 045 | TGH  | [{'id':'AA1' 'age':20}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':5}]
| 2009 | 019 | GFD  | [{'id':'AA1' 'age':14}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':55}]
| 2007 | 002 | GFD  | [{'id':'AA1' 'age':15}, {'id':'AA2' 'age':35},{'id':'AA3' 'age':58}]
| 2007 | 003 | GFD  | [{'id':'AA1' 'age':16}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':23}]
| 2007 | 006 | TGH  | [{'id':'AA1' 'age':16}, {'id':'AA2' 'age':14},{'id':'AA3' 'age':60}]    
| 2007 | 008 | TGH  | [{'id':'AA1' 'age':17}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':12}]
| 2008 | 010 | TGH  | [{'id':'AA1' 'age':18}, {'id':'AA2' 'age':16},{'id':'AA3' 'age':23}]    
| 2007 | 044 | GFD  | [{'id':'AA1' 'age':25}, {'id':'AA2' 'age':17},{'id':'AA3' 'age':52}]
| 2008 | 055 | TGH  | [{'id':'AA1' 'age':25}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':43}]
| 2007 | 032 | TGH  | [{'id':'AA1' 'age':22}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':77}]
| 2007 | 034 | TGH  | [{'id':'AA1' 'age':34}, {'id':'AA2' 'age':10},{'id':'AA3' 'age':51}]
| 2009 | 077 | GFD  | [{'id':'AA1' 'age':34}, {'id':'AA2' 'age':10},{'id':'AA3' 'age':12}]
| 2007 | 025 | GFD  | [{'id':'AA1' 'age':34}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':10}]

我正在尝试使用 pyspark 将数据按area 分组在一起,然后找出该区域访问者的平均年龄,以及该区域最常见的访问者年龄。

所以最初,我使用groupBy 将它们放在一起:

df.groupBy("area").agg(collect_list("visitor").alias("visitor_flatten"))

+-----------+---------------------+
|     area  |      visitor_flatten|
+-----------+---------------------+
|     GFD   |  [[{id=AA1, age=2...|
|     TGH   |  [[{id=AA1, age=2...|

但是当我尝试执行 .withColumn("test", explode("visitor_flatten")) 时,我得到了一个扩展的区域列表,每行有 1 个访问者条目(例如:{'id': 'AA1', 'age'=22}),但该数据是被视为字符串。所以我似乎无法使用udf 或任何api 函数从中提取年龄并对数据进行处理。比如查找该地区访客的平均年龄。以及如何查找该地区最常见的访客年龄。

任何想法/帮助将不胜感激!

【问题讨论】:

    标签: python pandas apache-spark pyspark


    【解决方案1】:

    您可以使用它来获取按地区分组的平均年龄。稍作修改,这应该对您有用。

    import pandas as pd
    import pyspark.sql.functions as F
    
    pdf = pd.DataFrame([["2007", "001", "GFD", "{'id':'AA1' 'age':20}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':4}"]]\
                       , columns=['year', 'id', 'area', 'visitor'])
    
    keys = ["id", "age"]
    
    df = spark.createDataFrame(pdf)\
    .withColumn("visitor", F.explode(F.split(F.col("visitor"), ",")))\
    .withColumn("visitor", F.trim(F.col("visitor")))\
    .withColumn("visitor", F.regexp_replace(F.col("visitor"), r"[{}']", ""))\
    .withColumn("visitor", F.expr("str_to_map(visitor, ' ', ':')"))\
    .select("*", *[ F.col("visitor")[k].alias(k) for k in keys ])\
    .groupBy("area")\
    .agg(F.avg("age").alias("avg_age"))
    
    df.show()
    

    【讨论】:

      【解决方案2】:

      您的聚合代码看起来很好,它应该可以完成工作,除非您的原始数据有一些问题(即数据以字符串而不是 JSON 格式出现)。这是我的测试代码,上面有你的数据,我设置了一个适当的模式:

      raw = [
          {'year':2007, 'id': '001', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':4}]},
          {'year':2009, 'id': '045', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':5}]},
          {'year':2009, 'id': '019', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':14}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':55}]},
          {'year':2007, 'id': '002', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':15}, {'id':'AA2', 'age':35},{'id':'AA3', 'age':58}]},
          {'year':2007, 'id': '003', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':23}]},
          {'year':2007, 'id': '006', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':14},{'id':'AA3', 'age':60}]},
          {'year':2007, 'id': '008', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':17}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':12}]},
          {'year':2008, 'id': '010', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':18}, {'id':'AA2', 'age':16},{'id':'AA3', 'age':23}]},
          {'year':2007, 'id': '044', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':17},{'id':'AA3', 'age':52}]},
          {'year':2008, 'id': '055', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':43}]},
          {'year':2007, 'id': '032', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':22}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':77}]},
          {'year':2007, 'id': '034', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':51}]},
          {'year':2009, 'id': '077', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':12}]},
          {'year':2007, 'id': '025', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':10}]},
      ]
      
      df = (spark
          .createDataFrame(raw)
          .select('year', 'id', 'area', 'visitor')
      )
      
      df.printSchema()
      # root
      #  |-- year: long (nullable = true)
      #  |-- id: string (nullable = true)
      #  |-- area: string (nullable = true)
      #  |-- visitor: array (nullable = true)
      #  |    |-- element: map (containsNull = true)
      #  |    |    |-- key: string
      #  |    |    |-- value: string (valueContainsNull = true)
      
      agg = df.groupBy("area").agg(F.collect_list("visitor").alias("visitor_flatten"))
      
      agg.show(10, False)
      # +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      # |area|visitor_flatten                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
      # +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      # |GFD |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}], [{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}], [{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]]|
      # |TGH |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}], [{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 18, id -> AA1}, {age -> 16, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 24, id -> AA2}, {age -> 43, id -> AA3}], [{age -> 22, id -> AA1}, {age -> 24, id -> AA2}, {age -> 77, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 51, id -> AA3}]]|
      # +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      
      agg.printSchema()
      # root
      #  |-- area: string (nullable = true)
      #  |-- visitor_flatten: array (nullable = false)
      #  |    |-- element: array (containsNull = false)
      #  |    |    |-- element: map (containsNull = true)
      #  |    |    |    |-- key: string
      #  |    |    |    |-- value: string (valueContainsNull = true)
      
      agg.withColumn("test", F.explode("visitor_flatten")).select('test').show(10, False)
      # +------------------------------------------------------------------------+
      # |test                                                                    |
      # +------------------------------------------------------------------------+
      # |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}] |
      # |[{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}]|
      # |[{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}]|
      # |[{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}]|
      # |[{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}]|
      # |[{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}]|
      # |[{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]|
      # |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}] |
      # |[{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}]|
      # |[{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}]|
      # +------------------------------------------------------------------------+
      # only showing top 10 rows
      
      agg.withColumn("test", F.explode("visitor_flatten")).select('test').printSchema()
      # root
      #  |-- test: array (nullable = false)
      #  |    |-- element: map (containsNull = true)
      #  |    |    |-- key: string
      #  |    |    |-- value: string (valueContainsNull = true)
      

      【讨论】:

        猜你喜欢
        • 2021-11-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-01-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多