【发布时间】:2017-07-01 06:05:34
【问题描述】:
在尝试通过 spark-mongo 连接器将数据从 S3 移动到 Mongo 并使用 SparkSQL 进行转换时,我不得不将列从字符串转换为 UUID。该列在 S3 中存储为字符串,我正在寻找适当的转换函数来调用以将其存储为 UUID,同时保存到 Mongo。
尝试使用 udf 但无法从数据框中读取特定列并将字符串值转换为 uuid。关于如何编写 spark udf 的任何建议?
来自 S3 文件的示例输入: key1 字符串、key2 字符串、key2_type int
Mongo 的预期输出: key1 UUID,key2 string,key2_type int
目前我们使用 SparkSQL 转换从 S3 读取保存到 Mongo
sourceMap = sourceMap ++ jsonObjectPropertiesToMap(List("s3path", "fileformat", "awsaccesskeyid", "awssecretaccesskey"), source)
sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive" , "true")
setAWSCredentials (sparkSession, sourceMap);
df = s3ToDataFrame(sourceMap("s3path"), sourceMap("fileformat"), sparkSession)
val dft = sparkSession.sql(mappingsToTransformedSQL(mappings))
destinationMap = destinationMap ++ jsonObjectPropertiesToMap(List("cluster", "database", "authenticationdatabase","collection", "login", "password"), destination)
dataFrameToMongodb(destinationMap("cluster"), destinationMap("database"), destinationMap("authenticationdatabase"),destinationMap("collection"),destinationMap("login"),destinationMap("password"), dft)
这是下面为 stringtoUUID 推荐的函数
def stringToUUID(uuid : String):String = {
java.util.UUID.fromString(
uuid
.replaceFirst(
"(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
)
).toString
}
val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
dft.withColumn("key1", stringToUUIDUdf(df("key1")))
这是我们得到的错误
17/07/01 17:51:05 INFO SparkSqlParser: Parsing command: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated AS updated FROM tmp
org.apache.spark.sql.AnalysisException: resolved attribute(s) key1#1 missing from key2#19,updated#22,site#21,key1#17,key1_type_id#18,key2_type_id#20 in operator !Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22];;
!Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22]
+- Project [key1#1 AS key1#17, key1_type_id#2 AS key1_type_id#18, key2#3 AS key2#19, key2_type_id#4 AS key2_type_id#20, site#5 AS site#21, updated#6 AS updated#22]
+- SubqueryAlias tmp, `tmp`
+- Relation[key1#1,key1_type_id#2,key2#3,key2_type_id#4,site#5,updated#6,pdateid#7] parquet
【问题讨论】:
-
你想如何将字符串转换为 UUID?你的意思是用破折号将字符串格式化为 UUID 格式吗?在这种情况下,请看这个:stackoverflow.com/questions/18986712/…
-
@PiotrKalański 我正在尝试在 Apache spark 框架中。我同意必须使用 UUID.fromString。但是如何针对 spark 数据框列呢?
-
key列似乎不可用。你能显示命令dft.show()的输出吗?可能您的函数mappingsToTransformedSQL正在生成错误的查询。 -
@PiotrKalański 没有添加 UDF 功能,一切都完美执行
标签: mongodb apache-spark apache-spark-sql