【问题标题】:SPARK to ELASTIC SEARCH program throwing exception, Append output mode not supportedSPARK to ELASTIC SEARCH 程序抛出异常,不支持追加输出模式
【发布时间】:2019-01-07 15:20:08
【问题描述】:

我在 Windows 上运行以下代码,它会引发错误并且无法正常工作。 zk,kafka,elasticsearch 所有服务器都在运行。数据已经发布到kafka topic

  object kses {
      def main(args: Array[String]): Unit = {


  val spark = SparkSession.builder.
    master("local")
    .appName("sparkToES")
    .config("es.nodes", "localhost")
     .config("es.index.auto.create","true")
     .getOrCreate()



  import spark.implicits._

  spark.sparkContext.setLogLevel("ERROR")

  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "logi1")
    .option("startingOffsets", "earliest")
    .load()

  val data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .as[(String, String)]

  val results = data
    .map(_._2)
    .flatMap(value => value.split("\\s+"))
    .groupByKey(_.toLowerCase)
    .count()

    val query = results.writeStream

      .format("org.elasticsearch.spark.sql")
      .outputMode("append")
      .option("es.nodes", "localhost")
      .option("es.port", "9200")
      .option("es.nodes.discovery", "true")
      .option("es.http.timeout", "20s")
      .option("es.http.retries", "0")
      .option("es.resource","logi123")
      .option("checkpointLocation", "~/checkpoint_es")
      .start()

  query.awaitTermination()

}

}


  ERROR - Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

When i change it to 'complete' mode then also code doesnot run.

zk,kafka,elasticsearch 所有服务器都在运行。

【问题讨论】:

    标签: scala apache-spark elasticsearch apache-kafka


    【解决方案1】:

    Save() 中的 ElasticSearch 资源路径应该是 ES 节点上索引资源的路径,而不是你的本地路径。

    检查 ES 节点的连通性,并在 Save() 方法中提供有效的 Index(例如:“index/persons”)路径。

    df.write.format("org.elasticsearch.spark.sql").option("es.nodes.wan.only","true")
    .option("es.port","9200")
    .option("es.net.ssl","true")
    .option("es.nodes","192.168.0.1")
    .mode("append")
    .option("es.nodes.client.only", "false")
    .save(<Index Resource PATH>)
    

    【讨论】:

      猜你喜欢
      • 2019-06-04
      • 1970-01-01
      • 2018-11-10
      • 2020-09-02
      • 1970-01-01
      • 1970-01-01
      • 2020-06-22
      • 2010-12-04
      • 1970-01-01
      相关资源
      最近更新 更多