【问题标题】:group the data in pyspark对pyspark中的数据进行分组
【发布时间】:2021-08-17 13:37:49
【问题描述】:

我有如下数据框,

df.select(to_json(struct("items"))).show(1, False)

items
-----
[{ "id":"1","types":{ "0":"1", "price":{ "value":"1"}}},
{ "id":"1","types":{ "0":"2", "price":{ "value":"2"}}},
{ "id":"2","types":{ "0":"3", "price":{ "value":"1"}}}]

现在我想在pyspark中实现这样的数据框, 基本上,我想根据 id 对内容进行分组。

items
-----
[{ "id":"1","types": [ {"0":"1", "price":{ "value":"1"}}, {"0":"2", "price":{ "value":"2"}} ],
{ "id":"2","types": [ {"0":"3", "price":{ "value":"1"}} ]

复制它:

from pyspark.sql import Row

# Spark version: 2.4.4
df = spark.createDataFrame([
 Row(items=[Row(id='1',types=Row(o='1',price=Row(value="1"))),
            Row(id='1',types=Row(o='10',price=Row(value="1"))),
            Row(id='2',types=Row(o='13',price=Row(value="1")))]),
 Row(items=[Row(id='3',types=Row(o='1',price=Row(value="1"))),
            Row(id='4',types=Row(o='10',price=Row(value="1"))),
            Row(id='3',types=Row(o='13',price=Row(value="1")))])
], schema='items:array<struct<id:string,types:struct<`0`:string,price:struct<value:string>>>>')

【问题讨论】:

  • struct >>> @Kafels
  • @Kafels.. 更新了问题.. 感谢您抽出宝贵时间:)
  • 火花 2.4.4 python2. @Kafels
  • 我相信这会解决你的问题stackoverflow.com/questions/37440373/… 只需使用函数 json_extract 获取 id 和 types 然后聚合

标签: python dataframe apache-spark pyspark apache-spark-sql


【解决方案1】:

首先我使用高阶函数AGGREGATEitems 列修改为map 列类型,其中键是您的id,值是types 值的数组。

为了完成它,我应用了另一个表达式,将map 转换为struct 类型,按照您想要的键。

import pyspark.sql.functions as f

# It's necessary to run using Spark 3 or later
spark.conf.set('spark.sql.mapKeyDedupPolicy', 'LAST_WIN')


agg_df = df.withColumn('items', f.expr('AGGREGATE(items, CAST(MAP() AS MAP<STRING, ARRAY<STRUCT<`0`:STRING, `price`:STRUCT<`value`:STRING>>>>), (acc, item) -> ' \
                                       'IF(acc[item.id] IS NULL, ' \
                                       'MAP_CONCAT(acc, MAP(item.id, ARRAY(item.types))), ' \
                                       'MAP_CONCAT(acc, MAP(item.id, ARRAY_UNION(acc[item.id], ARRAY(item.types))))))'))

transform_df = agg_df.withColumn('items', f.expr('TRANSFORM(MAP_KEYS(items), key -> STRUCT(key AS id, items[key] AS types))'))

# transform_df.printSchema()
# root
#  |-- items: array (nullable = true)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- id: string (nullable = true)
#  |    |    |-- types: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- 0: string (nullable = true)
#  |    |    |    |    |-- price: struct (nullable = true)
#  |    |    |    |    |    |-- value: string (nullable = true)

# Databricks only
display(transform_df)

输出

【讨论】:

    猜你喜欢
    • 2021-01-04
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    • 2021-12-22
    • 1970-01-01
    • 2018-08-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多