【问题标题】:spark sql udf cast return valuespark sql udf 强制转换返回值
【发布时间】:2018-06-11 14:33:05
【问题描述】:

我有一个像这样声明的变量:

val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))

我希望能够创建一个可在 Spark SQL 中使用的函数,以在 sql 查询中将 42461545 替换为 name4
我试图声明这个函数:

val jobnameDF = jobnameSeq.toDF("jobid","jobname")
sqlContext.udf.register("getJobname", (id: String) => (
     jobnameDF.filter($"jobid" === id).select($"jobname")
    )
)

在sql中这样使用:

select getjobname(jobid), other, field from table  

但是jobnameDF.filter($"jobid" === id).select($"jobname") 返回一个 DF 而不是一个字符串,我不知道如何简单地将这个值转换为字符串,因为每次只有一个结果。

如果Seq 不是在这种情况下使用的对象,我愿意接受建议。

编辑:
尽管建议的答案有效,但我正是这样做的:

#Convert my seq to a hash map
val jobMap = jobnameSeq.toMap
#declare a sql function so I could use it in sparksql (I need to be accessible to people that don't know scala
sqlContext.udf.register("getJobname", (id: String) => (
    jobMap(id)
    )
)

【问题讨论】:

  • 我的目标只是有一个函数,它根据我不想替换任何值的 id 返回作业名。 @RameshMaharjan 我尝试在那里添加更多上下文。
  • @RameshMaharjan 你知道我对 Spark 框架没有全面的了解吗,我很高兴能学习一种更好/新的做事方式。
  • 我只是说你不需要 spark 来满足你的要求。你的数据大吗?并且您的要求表明在 hashmap 中执行此操作会更快更有效
  • 序列确实很小所以没有集成到hdfs。我需要多次替换值的表确实很大。
  • 然后用替换部分更新问题。你一定会得到答案

标签: scala apache-spark apache-spark-sql


【解决方案1】:

您可以通过多种方式做到这一点:

val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),
                      ("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))
val jobIdDF = Seq( "42409245",("42409291"),("42409231")).toDF("jobID")
jobIdDF.createOrReplaceTempView("JobView")

只需在作业名序列上使用普通 scala 的 toMap 函数。

sqlContext.udf.register("jobNamelookUp", (jobID: String) =>  
                                            jobnameSeq.toMap.getOrElse(jobID,"null"))

//或

如果输入是 RDD,则使用 spark 使用 collectAsMap

val jobnameMap = sc.parallelize(jobnameSeq).collectAsMap
sqlContext.udf.register("lookupJobName",(jobID:String) => 
                                            jobnameMap.getOrElse(jobID,"null"))

//或

如果此查找发生在集群上,那么您可以broadcast 它。

val jobnameMapBC = sc.broadcast(jobnameMap)
sqlContext.udf.register("lookupJobNameBC",(jobID:String) => 
                                                jobnameMapBC.value.getOrElse(jobID,"null")) 

spark.sql("select jobID,jobNamelookUp(jobID) as jobNameUsingMap,
                        lookupJobNameBC(jobID) as jobNameUsingBC,
                        lookupJobName(jobID) as jobNameUsingRDDMap 
         from JobView")
    .show()

+--------+---------------+--------------+------------------+
|   jobID|jobNameUsingMap|jobNameUsingBC|jobNameUsingRDDMap|
+--------+---------------+--------------+------------------+
|42409245|         name12|        name12|            name12|
|42409291|          name1|         name1|             name1|
|42409231|           null|          null|              null|
+--------+---------------+--------------+------------------+    

按照Raphael 的建议,使用broadcast-join

import org.apache.spark.sql.functions._
val jobnameSeqDF = jobnameSeq.toDF("jobID","name")
jobIdDF.join(broadcast(jobnameSeqDF), Seq("jobID"),"leftouter").show(false)

+--------+------+
|jobID   |name  |
+--------+------+
|42409245|name12|
|42409291|name1 |
|42409231|null  |
+--------+------+

【讨论】:

  • 另一种方法是将jobnameSeq 转换为Dataframe,然后简单地将其加入table(使用广播连接)
  • 添加了拉斐尔。谢谢
【解决方案2】:

据我所知,您应该从您的序列中创建一个Map 并直接获取jobId

val simpleMap = jobnameSeq.toMap

println(simpleMap("42461545"))

应该给你name4

现在如果你想用dataframe进行测试,你可以执行以下操作

val jobnameDF = jobnameSeq.toDF("jobid","jobname")

val jobName = jobnameDF.filter($"jobid" === "42461545").select("jobname").first().getAs[String]("jobname")

println(jobName)

应该打印name4

【讨论】:

  • 很高兴听到@Kiwy :)
猜你喜欢
  • 1970-01-01
  • 2017-11-01
  • 2017-09-10
  • 2021-07-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多