【问题标题】:java.lang.String is not a valid external type for schema of int error in creating spark dataframejava.lang.String 不是创建 spark 数据框时 int 错误模式的有效外部类型
【发布时间】:2020-07-12 08:04:48
【问题描述】:

我只是尝试用 spark 制作数据框。我只是尝试编写如下代码。

首先,我导入如下

import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import org.apache.spark.util._
import org.apache.spark.sql.types.IntegerType`

然后,我尝试为数据框制作 Row 和 Schema,如下所示。

val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))

val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))

val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))

最后,我尝试查看数据框是否可以使用

EmpDF.show

我得到了如下错误

    ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
    java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
    java.lang.String is not a valid external type for schema of int
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, Name), StringType), true, false) AS Name#0
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, City), StringType), true, false) AS City#1
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, Salary), IntegerType) AS Salary#2
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
        ... 25 more
20/07/12 16:32:51 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, Name), StringType), true, false) AS Name#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, City), StringType), true, false) AS City#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, Salary), IntegerType) AS Salary#2
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)  

我该如何解决这个问题?

我已经尝试过如下导入

    import org.apache.spark.serializer.KryoSerializer
    import org.apache.spark.serializer.Serializer

但现在它显示错误

ERROR Executor: Exception in task 2.0 in stage 5.0 (TID 13)

【问题讨论】:

    标签: scala apache-spark runtime-error


    【解决方案1】:

    Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int 是由于定义的架构和实际数据"Jeong","Daejon","3400000" -> (string,string,string) 之间的类型不匹配。但是您在架构中指定为(String,String,Integer)

    更新了特定于整数类型的代码 1:

    import org.apache.spark.sql.types._
    import org.apache.spark.storage.StorageLevel
    import scala.io.Source
    import scala.collection.mutable.HashMap
    import java.io.File
    import org.apache.spark.sql.Row
    import scala.collection.mutable.ListBuffer
    import org.apache.spark.util._
    import org.apache.spark.sql.types._
    
    val Employee = Seq(Row("Kim","Seoul",1000000),Row("Lee","Busan",2000000),Row("Park","Jeju",3000000),Row("Jeong","Daejon",3400000))
    
    val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))
    
    val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
    EmpDF.show()
    /*+-----+------+-------+
    | Name|  City| Salary|
    +-----+------+-------+
    |  Kim| Seoul|1000000|
    |  Lee| Busan|2000000|
    | Park|  Jeju|3000000|
    |Jeong|Daejon|3400000|
    +-----+------+-------+*/
    

    更新了特定于字符串类型的代码:

    import org.apache.spark.sql.types._
    import org.apache.spark.storage.StorageLevel
    import scala.io.Source
    import scala.collection.mutable.HashMap
    import java.io.File
    import org.apache.spark.sql.Row
    import scala.collection.mutable.ListBuffer
    import org.apache.spark.util._
    import org.apache.spark.sql.types._
    
    val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))
    
    val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", StringType, true))
    
    val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
    EmpDF.show()
    /*+-----+------+-------+
    | Name|  City| Salary|
    +-----+------+-------+
    |  Kim| Seoul|1000000|
    |  Lee| Busan|2000000|
    | Park|  Jeju|3000000|
    |Jeong|Daejon|3400000|
    +-----+------+-------+*/
    

    【讨论】:

    • 非常感谢!问题解决了!我想我会尝试更多地研究模式和字符串。谢谢)
    【解决方案2】:
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    import spark.implicits._
    Seq(("Kim","Seoul",1000000),("Lee","Busan",2000000),("Park","Jeju",3000000),("Jeong","Daejon",3400000))
            .toDF("name","city", "salary")
            .show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-29
      • 2020-12-08
      • 1970-01-01
      • 2018-03-01
      • 1970-01-01
      • 2021-07-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多