【发布时间】:2018-05-10 06:20:57
【问题描述】:
我是 spark/scala 的新手。我正在尝试从配置单元表中读取一些数据到火花数据框中,然后根据某些条件添加一列。这是我的代码:
val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")
def dateDiff(partition_date: org.apache.spark.sql.Column, item_due_date: org.apache.spark.sql.Column): Long ={
ChronoUnit.DAYS.between(LocalDate.parse(partition_date.toString()), LocalDate.parse(item_due_date.toString))
}
val finalDF = DF.withColumn("status",
when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && !(col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "approved")
.when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "pending")
.when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) >= 0), "expired")
.otherwise("null"))
dateDiff 是一个计算partition_date 和item_due_date 之间差异的函数,它们是DF 中的列。我正在尝试使用when 和otherwise 向DF 添加一个新列,它使用dateDiff 来获取日期之间的差异。
现在,当我运行上述代码时,出现以下错误:org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0
我相信partition_date 列的值没有被转换为要解析为日期的字符串。这是怎么回事?如果是,如何将列值转换为字符串?
下面是我在DF 中使用的列的架构:
|-- item_due_date: string (nullable = true)
|-- past_due: integer (nullable = true)
|-- item_decision: string (nullable = true)
|-- partition_date: string (nullable = true)
我正在使用的来自DF 的列的数据样本:
+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
| 1| 0001-01-14| null| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 0| 2018-03-18| null| 2017-11-22|
| 1| 2016-11-30| null| 2017-11-22|
+--------+-------------+-------------+--------------+
我也尝试使用自定义 UDF:
def status(past_due: Int, item_decision: String, maxPartitionDate: String, item_due_date: String): String = {
if (past_due == 1 && item_due_date != "NULL") {
if (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(item_due_date.trim)) < 0) {
if (item_decision != "NULL") "pending"
else "approved"
} else "expired"
} else "NULL"
}
val statusUDF = sqlContext.udf.register("statusUDF", status _)
val DF2 = DF.withColumn("status", statusUDF(DF("past_due"),DF("item_decision"),DF("partition_date"),DF("item_due_date")))
DF2.show()
它每次都会在DF2.show 语句中引发以下错误:
Container exited with a non-zero exit code 50
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
at driver$.main(driver.scala:109)
at driver.main(driver.scala)
任何帮助将不胜感激。谢谢!
【问题讨论】:
-
您的函数是基于列的。因此,如果有任何功能满足您的需求,您可以使用spark functions。否则,如果您想在原始数据类型中进行操作,则必须使用 udf 函数。
-
我尝试使用 UDF 代替
when和otherwise,但在显示/保存数据帧时遇到错误。所以我改用这种方法。有没有办法解决这种方法中的错误? -
您必须使用示例数据框和数据框架构更新您的问题。这将帮助您快速获得答案
-
@Hemanth 你写的函数不是
udf,你应该仔细阅读如何写一个合适的udf。 -
我知道这不是
UDF。获取日期差异是一个正常的 scala 函数。 @cue 关于如何让它发挥作用的任何想法?
标签: scala apache-spark dataframe apache-spark-sql user-defined-functions