【问题标题】:Partitioning by column in Apache Spark to S3在 Apache Spark 中按列分区到 S3
【发布时间】:2018-03-22 17:49:21
【问题描述】:

有一个用例,我们想从具有 JSON 的 S3 中读取文件。然后,基于特定的 JSON 节点值,我们要对数据进行分组并将其写入 S3。

我能够读取数据,但无法找到如何根据 JSON 键对数据进行分区然后上传到 S3 的好例子。任何人都可以提供任何示例或指向可以帮助我解决此用例的教程吗?

我在创建数据框后得到了我的数据架构:

root
 |-- customer: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |-- experiment: string (nullable = true)
 |-- expiryTime: long (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- programId: string (nullable = true)
 |-- score: double (nullable = true)
 |-- startTime: long (nullable = true)
 |-- targetSets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- featured: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- data: struct (nullable = true)
 |    |    |    |    |    |-- asinId: string (nullable = true)
 |    |    |    |    |-- pk: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- reason: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- recommended: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)

我想根据 customerId 列上的随机散列对数据进行分区。但是当我这样做时:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save");

报错:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));

请让我知道我可以访问 customerId 列。

【问题讨论】:

    标签: scala hadoop apache-spark amazon-s3 mapreduce


    【解决方案1】:

    让我们以示例数据集 sample.json

    {"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"}
    {"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
    {"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
    {"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"}
    {"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"}
    

    现在开始用 Spark 破解它

    val jsonDf = spark.read
      .format("json")
      .load("path/of/sample.json")
    
    jsonDf.show()
    
    +---------+-------+-----+-----+
    |     CITY|CUST_ID|STATE|  ZIP|
    +---------+-------+-----+-----+
    | San Jose| 115734|   CA|95106|
    |Allentown| 115728|   PA|18101|
    |Allentown| 115730|   PA|18101|
    |San Mateo| 114728|   CA|94401|
    | Somerset| 114726|   NJ| 8873|
    +---------+-------+-----+-----+
    

    然后按列"ZIP"对数据集进行分区并写入S3

    jsonDf.write
      .partitionBy("ZIP")
      .save("s3/bucket/location/to/save")
      // one liner athentication to s3
      //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save")
    

    注意:为了这个代码成功 S3 访问和密钥必须 正确配置。检查这个answer for Spark/Hadoop integration with S3

    编辑:解决方案:在架构中找不到分区列 customerId(根据评论)

    customerId 存在于customer 结构中,因此请尝试提取customerId 然后进行分区。

    df.withColumn("customerId", $"customer.customerId")
      .drop("customer")
      .write.partitionBy("customerId")
      .save("s3/bucket/location/to/save")
    

    【讨论】:

      猜你喜欢
      • 2022-12-18
      • 1970-01-01
      • 1970-01-01
      • 2020-09-29
      • 1970-01-01
      • 1970-01-01
      • 2016-08-01
      • 2020-05-26
      • 1970-01-01
      相关资源
      最近更新 更多