【问题标题】:Spark dataframe from String to UUID从字符串到 UUID 的 Spark 数据帧
【发布时间】:2017-07-01 06:05:34
【问题描述】:

在尝试通过 spark-mongo 连接器将数据从 S3 移动到 Mongo 并使用 SparkSQL 进行转换时,我不得不将列从字符串转换为 UUID。该列在 S3 中存储为字符串,我正在寻找适当的转换函数来调用以将其存储为 UUID,同时保存到 Mongo。

尝试使用 udf 但无法从数据框中读取特定列并将字符串值转换为 uuid。关于如何编写 spark udf 的任何建议?

来自 S3 文件的示例输入: key1 字符串、key2 字符串、key2_type int

Mongo 的预期输出: key1 UUID,key2 string,key2_type int

目前我们使用 SparkSQL 转换从 S3 读取保存到 Mongo

sourceMap = sourceMap ++ jsonObjectPropertiesToMap(List("s3path", "fileformat", "awsaccesskeyid", "awssecretaccesskey"), source)
            sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.‌​input.dir.recursive" , "true")
            setAWSCredentials (sparkSession, sourceMap);
            df = s3ToDataFrame(sourceMap("s3path"), sourceMap("fileformat"), sparkSession)
            
val dft = sparkSession.sql(mappingsToTransformedSQL(mappings))

destinationMap = destinationMap ++ jsonObjectPropertiesToMap(List("cluster", "database", "authenticationdatabase","collection", "login", "password"), destination)
            dataFrameToMongodb(destinationMap("cluster"), destinationMap("database"), destinationMap("authenticationdatabase"),destinationMap("collection"),destinationMap("login"),destinationMap("password"), dft)

这是下面为 stringtoUUID 推荐的函数

def stringToUUID(uuid : String):String = {
          java.util.UUID.fromString(
            uuid
              .replaceFirst(
                "(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
              )
          ).toString
        }

        val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
        
        dft.withColumn("key1", stringToUUIDUdf(df("key1")))

这是我们得到的错误

17/07/01 17:51:05 INFO SparkSqlParser: Parsing command: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated AS updated FROM tmp
org.apache.spark.sql.AnalysisException: resolved attribute(s) key1#1 missing from key2#19,updated#22,site#21,key1#17,key1_type_id#18,key2_type_id#20 in operator !Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22];;
!Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22]
+- Project [key1#1 AS key1#17, key1_type_id#2 AS key1_type_id#18, key2#3 AS key2#19, key2_type_id#4 AS key2_type_id#20, site#5 AS site#21, updated#6 AS updated#22]
   +- SubqueryAlias tmp, `tmp`
      +- Relation[key1#1,key1_type_id#2,key2#3,key2_type_id#4,site#5,updated#6,pdateid#7] parquet

【问题讨论】:

  • 你想如何将字符串转换为 UUID?你的意思是用破折号将字符串格式化为 UUID 格式吗?在这种情况下,请看这个:stackoverflow.com/questions/18986712/…
  • @PiotrKalański 我正在尝试在 Apache spark 框架中。我同意必须使用 UUID.fromString。但是如何针对 spark 数据框列呢?
  • key 列似乎不可用。你能显示命令dft.show() 的输出吗?可能您的函数 mappingsToTransformedSQL 正在生成错误的查询。
  • @PiotrKalański 没有添加 UDF 功能,一切都完美执行

标签: mongodb apache-spark apache-spark-sql


【解决方案1】:

使用下面的逻辑让它工作。

依赖:

   <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>bson</artifactId>
        <version>3.4.2</version>
    </dependency>

功能:

def test(uuids : String): Binary ={ 
val uuid = UUID.fromString(uuids)
val holder = new BsonDocument 
val writer = new BsonDocumentWriter(holder)      
writer.writeStartDocument()
writer.writeName("uuid")
new UuidCodec(UuidRepresentation.STANDARD).encode(writer, uuid, 
EncoderContext.builder().build())
writer.writeEndDocument()
val bsonBinary = holder.getBinary("uuid");
val test2=  new Binary(bsonBinary.getType(), bsonBinary.getData()); 
return test2

}

【讨论】:

    【解决方案2】:

    从定义Scala函数开始:

    def stringToUUID(uuid: String): String = {
      java.util.UUID.fromString(
        uuid
          .replaceFirst(
            "(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
          )
      ).toString
    }
    

    根据上述函数创建UDF:

    val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
    

    使用withColumn 转换添加新的 uuid 列:

    df.withColumn("uuid", stringToUUIDUdf(df("text")))
    

    也可以使用select转换:

    df.select(stringToUUIDUdf(df("text")).alias("uuid"))
    

    示例:

    val df = session.createDataset(Seq(
      "7158e7a4c1284697bcab58dfb8c80e66",
      "cf251f4c667c46b3a9f67681f3be2338",
      "42d3ee515d8c4268b47b579170c88e4c",
      "6b7e3222292d4dc5a8a369f7fede7dc4",
      "b371896d39d04fbb8a8646a176e60d17",
      "e2b57f1677154c5bbe181a575aba4684",
      "2a2e11c4cc604673bbd13b22f029dabb",
      "fcad3f649a114336a721fc3eaefd6ce1",
      "f3f6fcfd16394e1e9c98aae0bd062432",
      "8b0e1929e335489997bfca20bb021d62"
    )).toDF("text")
    
    df.withColumn("uuid", stringToUUIDUdf(df("text"))).show(false)
    

    结果: +--------------------------------+------------------------------------+ |text |uuid | +--------------------------------+------------------------------------+ |7158e7a4c1284697bcab58dfb8c80e66|7158e7a4-c128-4697-bcab-58dfb8c80e66| |cf251f4c667c46b3a9f67681f3be2338|cf251f4c-667c-46b3-a9f6-7681f3be2338| |42d3ee515d8c4268b47b579170c88e4c|42d3ee51-5d8c-4268-b47b-579170c88e4c| |6b7e3222292d4dc5a8a369f7fede7dc4|6b7e3222-292d-4dc5-a8a3-69f7fede7dc4| |b371896d39d04fbb8a8646a176e60d17|b371896d-39d0-4fbb-8a86-46a176e60d17| |e2b57f1677154c5bbe181a575aba4684|e2b57f16-7715-4c5b-be18-1a575aba4684| |2a2e11c4cc604673bbd13b22f029dabb|2a2e11c4-cc60-4673-bbd1-3b22f029dabb| |fcad3f649a114336a721fc3eaefd6ce1|fcad3f64-9a11-4336-a721-fc3eaefd6ce1| |f3f6fcfd16394e1e9c98aae0bd062432|f3f6fcfd-1639-4e1e-9c98-aae0bd062432| |8b0e1929e335489997bfca20bb021d62|8b0e1929-e335-4899-97bf-ca20bb021d62| +--------------------------------+------------------------------------+

    【讨论】:

    • 问题是我们正在读取 S3 并使用 SparkSQL 转换并将其写回 Mongo。我们从 S3 获取数据并将其传递给 val dft = sparkSession.sql(mappingsToTransformedSQL(mappings))。现在调用 dft.withColumn("key1", stringToUUIDUdf(df("key1"))),导致错误解析命令: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated从 tmp org.apache.spark.sql.AnalysisException 更新:已解决的属性 key1#1 从 key2#19、updated#22、site#21、key1#17、key1_type_id#18、key2_type_id#20 在运算符中丢失,
    • 添加了我在原始帖子中遇到的功能和错误。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-01-02
    • 2021-04-01
    • 1970-01-01
    • 2022-11-02
    • 1970-01-01
    • 2021-07-18
    • 1970-01-01
    相关资源
    最近更新 更多