【问题标题】:Unable to find encoder for type stored in a Dataset找不到存储在数据集中的类型的编码器
【发布时间】:2017-11-16 14:07:18
【问题描述】:

我想得到如下内容的DataSet:

111,Array([123,1],[222,3]
222,Array([333,3],[444,3]

这是我的 Spark 2.2.0 和 Scala 2.11 的代码:

val spark = SparkSession
                    .builder()
                    .appName("SparkSessionZipsExample")
                    .config("spark.sql.warehouse.dir", inputPath)
                    .enableHiveSupport()
                    .getOrCreate()

val df = spark.read.parquet(inputPath)
df.createOrReplaceTempView("sample_data")
val rows = spark.sql("SELECT * FROM sample_data")

val result = rows.map{ row: Row => {
     val pk = row.get(row.fieldIndex("pk")).toString.toLong

     val r = spark.sql("SELECT pk FROM sample_data WHERE pk != " + pk)
     val productList = r.rdd.map(r => r(0).toString.toLong).collect()

     (row.get(row.fieldIndex("pk")).toString.toLong, productList)
}}

但我收到此错误:

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[ERROR]       val result = rows.map{ row: Row => {

我尝试导入sqlContext.implicits._,但无法编译。

在 Maven 中我有这个依赖:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

更新:

最后我按如下方式导入了隐式:import spark.implicits._,但在运行时出现此错误:

   java.lang.NullPointerException
        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
        at org.test.Compute$$anonfun$1.apply(ComputeNumSim.scala:68)
        at org.test.Compute$$anonfun$1.apply(ComputeNumSim.scala:61)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

  • 你试过import spark.implicits._
  • @AlexNaspo:是的,谢谢。这有效,但我在运行时遇到了另一个错误。请查看我的更新。

标签: scala maven apache-spark


【解决方案1】:

您不能将RDD 存储在Dataset 中,或者将任何其他分布式集合存储在另一个分布式集合中。你甚至不应该在map 中访问DataFrame,但是有与Datasets 相关的seems to be an interesting loophole 由本地集合支持。

在这种情况下,您应该通过键 joinDatasets

rows.alias("rows").join(
  spark.table("sample_data").alias("sample"), 
  $"rows.pk" =!= $"sample.pl"
)

或更明确地

rows.alias("rows")
  .crossJoin(spark.table("sample_data").alias("sample")) 
  .where($"rows.pk" =!= $"sample.pk")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-12-27
    • 1970-01-01
    • 2018-08-13
    • 1970-01-01
    • 1970-01-01
    • 2016-12-04
    • 1970-01-01
    相关资源
    最近更新 更多