【发布时间】:2019-01-07 15:20:08
【问题描述】:
我在 Windows 上运行以下代码,它会引发错误并且无法正常工作。 zk,kafka,elasticsearch 所有服务器都在运行。数据已经发布到kafka topic
object kses {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.
master("local")
.appName("sparkToES")
.config("es.nodes", "localhost")
.config("es.index.auto.create","true")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logi1")
.option("startingOffsets", "earliest")
.load()
val data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val results = data
.map(_._2)
.flatMap(value => value.split("\\s+"))
.groupByKey(_.toLowerCase)
.count()
val query = results.writeStream
.format("org.elasticsearch.spark.sql")
.outputMode("append")
.option("es.nodes", "localhost")
.option("es.port", "9200")
.option("es.nodes.discovery", "true")
.option("es.http.timeout", "20s")
.option("es.http.retries", "0")
.option("es.resource","logi123")
.option("checkpointLocation", "~/checkpoint_es")
.start()
query.awaitTermination()
}
}
ERROR - Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
When i change it to 'complete' mode then also code doesnot run.
zk,kafka,elasticsearch 所有服务器都在运行。
【问题讨论】:
标签: scala apache-spark elasticsearch apache-kafka