【问题标题】:how to have a nested structure with reduceByKey (pyspark)?如何使用reduceByKey(pyspark)拥有嵌套结构?
【发布时间】:2020-03-14 16:10:11
【问题描述】:

我正在使用 spark (pyspark) 处理我想根据 3 个值进行分区并写回 S3 的数据集。数据集如下所示 -

customerId、productId、createDate

我想按 customerId 然后 productId 然后 createDate 对这些数据进行分区。因此,当我将此分区数据写入 s3 时,它应该具有以下结构 -

customerId=1
  productId='A1'
    createDate=2019-10
    createDate=2019-11
    createDate=2019-12
  productId='A2'
    createDate=2019-10
    createDate=2019-11
    createDate=2019-12

下面是我用来创建分区的代码。

rdd = sc.textFile("data.json")  #sc is spark context
r1.map(lambda r: (r["customerId"], r["productId"],r["createDate"])).distinct().map(lambda r: (r[0], ([r[1]],[r[2]]))).reduceByKey(lambda a, b: (a[0] + b[0],a[1] + b[1])).collect()

[('1', ([A1,A2], ['2019-12', '2019-11', '2019-10', '2019-12', '2019-11', '2019-10']))]

这段代码确实给了我一个平面结构,而不是我提到的嵌套结构。是否有可能改变我描述的方式。任何指针都非常受欢迎。

【问题讨论】:

    标签: python pyspark rdd reduce


    【解决方案1】:

    首先将您的 JSON 文件读取到数据框。

    import json
    a=[json.dumps("/data.json")]
    jsonRDD = sc.parallelize(a)
    df = spark.read.json(jsonRDD)
    

    然后使用groupbycollectlist 获得所需的格式。

    import pyspark.sql.functions as func
    df.groupby('customerId','productId').agg(func.collectList('createDate')).collect()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-12-27
      • 2017-10-07
      • 2014-01-31
      • 1970-01-01
      • 1970-01-01
      • 2023-03-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多