【发布时间】:2020-08-12 22:56:57
【问题描述】:
我的代码如下所示:
object DataTypeValidation extends Logging {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.appName("SparkProjectforDataTypeValidation")
.master("local")
.getOrCreate();
spark.sparkContext.setLogLevel("ERROR")
try {
breakable {
val format = new SimpleDateFormat("d-M-y hh:mm:ss.SSSSS")
println("*********Data Type Validation Started*************** " + format.format(Calendar.getInstance().getTime()))
val data = Seq(Row(873131558, "ABC22"), Row(29000000, 99.00), Row(27000000, 2.34))
val schema = StructType(Array(
StructField("oldcl", IntegerType, nullable = true),
StructField("newcl", DoubleType, nullable = true))
)
val ONE = 1
var erroredRecordRow = new scala.collection.mutable.ListBuffer[Row]()
val newSchema = schema.fields.map({
case StructField(name, _: IntegerType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
case StructField(name, _: DoubleType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
case fields => fields
}).dropRight(ONE)
val newStructType = StructType { newSchema }
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.show()
print(df.schema)
}
} catch {
case exception: Exception =>
println("exception caught in Data Type Mismatch In Schema Validation: " + exception.toString())
exception.printStackTrace();
}
spark.stop()
}
}
exception caught in Data Type Mismatch In Schema Validation: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, oldcl), IntegerType) AS oldcl#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, newcl), DoubleType) AS newcl#1
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
【问题讨论】:
-
嗨,@smart_coder,感谢您的回复。但在这种情况下,我无法将 Double 类型更改为 String,这是由我们的客户使用适当的文档定义的。同样在模式中,它应该是双倍的。此外,上面的代码将这些列映射为 String 类型,以便任何可以传递的值都应该成功。
标签: java scala apache-spark hadoop