【问题标题】:Unable to find encoder for type stored in a Dataset. in spark structured streaming找不到存储在数据集中的类型的编码器。在火花结构化流中
【发布时间】:2017-12-27 00:33:22
【问题描述】:

我正在尝试 spark 网站上给出的 spark 结构化流的示例,但它抛出错误

1.找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。

2。方法的参数不足:(隐式证据$2:org.apache.spark.sql.Encoder[data])org.apache.spark.sql.Dataset[data]。 未指定值参数evidence$2。 val ds: 数据集[数据] = df.as[数据]

这是我的代码

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
          .builder()
          .appName("kafka-consumer")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

        spark.sparkContext.setLogLevel("WARN")

    case class data(name: String, id: String)

    val df = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "172.21.0.187:9093")
          .option("subscribe", "test")
          .load()
    println(df.isStreaming)

    val ds: Dataset[data] = df.as[data]
    val value = ds.select("name").where("id > 10")




    value.writeStream
          .outputMode("append")
          .format("console")
          .start()
          .awaitTermination()

  }
}

任何关于如何使这项工作的帮助。? 我想要这样的最终输出 我想要这样的输出

+-----+--------+
| name|    id
+-----+--------+
|Jacek|     1
+-----+--------+

【问题讨论】:

  • 你能把case类移出final_stream对象并运行吗?
  • 如果我将案例类移到对象之外,它会显示错误“线程“主”中的异常 org.apache.spark.sql.AnalysisException:无法解析'name'给定的输入列:[主题, 时间戳, 值, 键, 偏移量, 时间戳类型, 分区];"
  • 您的数据包含 [主题、时间戳、值、键、偏移量、时间戳类型、分区],但您的案例类仅包含名称和 id
  • 我应该如何解决这个错误?
  • 我只想通过结构化流显示从kafka到spark控制台的json字符串

标签: scala apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

错误的原因是您正在处理来自 Kafka 的 Array[Byte] 并且没有匹配 data 案例类的字段。

scala> println(schema.treeString)
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

df.as[data] 行更改为以下内容:

df.
  select($"value" cast "string").
  map(value => ...parse the value to get name and id here...).
  as[data]

我强烈建议使用selectfunctions 对象来处理传入的数据。

【讨论】:

  • 能否举例说明如何解析名称和id
  • 当然,但这取决于您如何通过网络发送带有名称和 ID 的字符串。如何发布到 Kafka 主题?
【解决方案2】:

该错误是由于数据框中的列数与您的案例类不匹配造成的。

您在数据框中有 [topic, timestamp, value, key, offset, timestampType, partition]

而您的案例类只有两列

case class data(name: String, id: String)

你可以将dataframe的内容显示为

val display = df.writeStream.format("console").start()

睡几秒钟,然后

display.stop()

并且还使用option("startingOffsets", "earliest") 提到的here

然后根据您的数据创建一个案例类。

希望这会有所帮助!

【讨论】:

  • 线程“main”中的异常 org.apache.spark.sql.AnalysisException: 必须使用 writeStream.start() 执行带有流源的查询;;运行时 df.show()
  • 我想要这样的输出 +-----+--------+ |姓名| id +-----+--------+ |Jacek| 1 +-----+--------+
  • 希望这会有所帮助 stackoverflow.com/questions/45092445/… 抱歉,我们不能将 .show 用于流媒体
  • 上述解决方案也不起作用,有人可以帮助我提供完整的解决方案
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-05-31
  • 1970-01-01
  • 1970-01-01
  • 2018-08-13
  • 1970-01-01
  • 2019-11-07
  • 2015-07-06
相关资源
最近更新 更多