【发布时间】:2018-03-22 17:49:21
【问题描述】:
有一个用例,我们想从具有 JSON 的 S3 中读取文件。然后,基于特定的 JSON 节点值,我们要对数据进行分组并将其写入 S3。
我能够读取数据,但无法找到如何根据 JSON 键对数据进行分区然后上传到 S3 的好例子。任何人都可以提供任何示例或指向可以帮助我解决此用例的教程吗?
我在创建数据框后得到了我的数据架构:
root
|-- customer: struct (nullable = true)
| |-- customerId: string (nullable = true)
|-- experiment: string (nullable = true)
|-- expiryTime: long (nullable = true)
|-- partitionKey: string (nullable = true)
|-- programId: string (nullable = true)
|-- score: double (nullable = true)
|-- startTime: long (nullable = true)
|-- targetSets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- featured: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- data: struct (nullable = true)
| | | | | |-- asinId: string (nullable = true)
| | | | |-- pk: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- reason: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- recommended: array (nullable = true)
| | | |-- element: string (containsNull = true)
我想根据 customerId 列上的随机散列对数据进行分区。但是当我这样做时:
df.write.partitionBy("customerId").save("s3/bucket/location/to/save");
报错:
org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));
请让我知道我可以访问 customerId 列。
【问题讨论】:
标签: scala hadoop apache-spark amazon-s3 mapreduce