【发布时间】:2017-10-10 09:14:34
【问题描述】:
我需要创建自己的 UnaryTransformer 实例,该实例接受 Array[String] 类型的 Dataframe Column 并且还应该输出相同的类型。在尝试这样做时,我在 Spark 2.1.0 版上遇到了 ClassCastException。 我已经整理了一个示例测试来显示我的情况。
import org.apache.spark.SparkConf
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
class MyTransformer(override val uid:String) extends UnaryTransformer[Array[String],Array[String],MyTransformer] {
override protected def createTransformFunc: (Array[String]) => Array[String] = {
param1 => {
param1.foreach(println(_))
param1
}
}
override protected def outputDataType: DataType = ArrayType(StringType)
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == ArrayType(StringType), s"Data type mismatch between Array[String] and provided type $inputType.")
}
def this() = this( Identifiable.randomUID("tester") )
}
object Tester {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setAppName("Tester")
implicit val sparkSession = SparkSession.builder().config(config).getOrCreate()
import sparkSession.implicits._
val dataframe = Seq(Array("Firstly" , "F1"),Array("Driving" , "S1" ),Array("Ran" , "T3" ),Array("Fourth" ,"F4"), Array("Running" , "F5")
,Array("Gone" , "S6")).toDF("input")
val transformer = new MyTransformer().setInputCol("input").setOutputCol("output")
val transformed = transformer.transform(dataframe)
transformed.select("output").show()
println("Complete....")
sparkSession.close()
}
}
附加堆栈跟踪以供参考
线程“主”org.apache.spark.SparkException 中的异常:未能 执行用户定义函数($anonfun$createTransformFunc$1: (数组)=> 数组)在 org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072) 在 org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:144) 在 org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48) 在 org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 在 scala.collection.immutable.List.map(List.scala:296) 在 org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$21.applyOrElse(Optimizer.scala:1078) 在 org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$21.applyOrElse(Optimizer.scala:1073) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277) 在 org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1073) 在 org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1072) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) 在 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) 在 scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) 在 scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) 在 org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73) 在 org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73) 在 org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79) 在 org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) 在 org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84) 在 org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84) 在 org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791) 在 org.apache.spark.sql.Dataset.head(Dataset.scala:2112) 在 org.apache.spark.sql.Dataset.take(Dataset.scala:2327) 在 org.apache.spark.sql.Dataset.showString(Dataset.scala:248) 在 org.apache.spark.sql.Dataset.show(Dataset.scala:636) 在 org.apache.spark.sql.Dataset.show(Dataset.scala:595) 在 org.apache.spark.sql.Dataset.show(Dataset.scala:604) 在 Tester$.main(Tester.scala:45) 在 Tester.main(Tester.scala) 原因:java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef 无法转换为 [Ljava.lang.String;在 MyTransformer$$anonfun$createTransformFunc$1.apply(Tester.scala:9) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:89) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:88) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1069) ... 53 更多
【问题讨论】:
标签: apache-spark apache-spark-2.0