【发布时间】:2017-12-27 00:33:22
【问题描述】:
我正在尝试 spark 网站上给出的 spark 结构化流的示例,但它抛出错误
1.找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。
2。方法的参数不足:(隐式证据$2:org.apache.spark.sql.Encoder[data])org.apache.spark.sql.Dataset[data]。 未指定值参数evidence$2。 val ds: 数据集[数据] = df.as[数据]
这是我的代码
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
case class data(name: String, id: String)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.load()
println(df.isStreaming)
val ds: Dataset[data] = df.as[data]
val value = ds.select("name").where("id > 10")
value.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
任何关于如何使这项工作的帮助。? 我想要这样的最终输出 我想要这样的输出
+-----+--------+
| name| id
+-----+--------+
|Jacek| 1
+-----+--------+
【问题讨论】:
-
你能把case类移出final_stream对象并运行吗?
-
如果我将案例类移到对象之外,它会显示错误“线程“主”中的异常 org.apache.spark.sql.AnalysisException:无法解析'
name'给定的输入列:[主题, 时间戳, 值, 键, 偏移量, 时间戳类型, 分区];" -
您的数据包含 [主题、时间戳、值、键、偏移量、时间戳类型、分区],但您的案例类仅包含名称和 id
-
我应该如何解决这个错误?
-
我只想通过结构化流显示从kafka到spark控制台的json字符串
标签: scala apache-spark apache-kafka spark-structured-streaming