【问题标题】:Spark Structured Streaming: console sink is not working as expectedSpark Structured Streaming:控制台接收器未按预期工作
【发布时间】:2019-08-20 02:05:58
【问题描述】:

我有以下代码使用结构化流读取和处理 Kafka 数据

object ETLTest {

  case class record(value: String, topic: String)

  def main(args: Array[String]): Unit = {
    run();
  }

  def run(): Unit = {

    val spark = SparkSession
      .builder
      .appName("Test JOB")
      .master("local[*]")
      .getOrCreate()

    val kafkaStreamingDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "...")
      .option("subscribe", "...")
      .option("failOnDataLoss", "false")
      .option("startingOffsets","earliest")
      .load()
      .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")

    val sdvWriter = new ForeachWriter[record] {
      def open(partitionId: Long, version: Long): Boolean = {
        true
      }
      def process(record: record) = {
        println("record:: " + record)
      }
      def close(errorOrNull: Throwable): Unit = {}
    }

    val sdvDF = kafkaStreamingDF
      .as[record]
      .filter($"value".isNotNull)

    // DOES NOT WORK
    /*val query = sdvDF
        .writeStream
        .format("console")
        .start()
        .awaitTermination()*/

    // WORKS
    /*val query = sdvDF
      .writeStream
      .foreach(sdvWriter)
      .start()
      .awaitTermination()
      */

  }

}

我从 IntellijIdea IDE 运行此代码,当我使用 foreach(sdvWriter) 时,我可以看到从 Kafka 消耗的记录,但是当我使用 .writeStream.format("console") 时,我看不到任何记录。我假设控制台写入流正在维护某种检查点,并假设它已经处理了所有记录。是这样吗?我在这里遗漏了什么明显的东西吗?

【问题讨论】:

    标签: scala apache-spark spark-structured-streaming


    【解决方案1】:

    在此处复制您的代码 这两个选项都有效。实际上在两个选项中都没有
    import spark.implicits._ 它会失败,所以我不确定你错过了什么。可能是某些依赖项配置不正确。你可以添加 pom.xml 吗?

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.streaming.Trigger
    
    
    
    object Check {
    
      case class record(value: String, topic: String)
    
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession
          .builder().master("local[2]")
          .getOrCreate
    
    
        import spark.implicits._
    
        val kafkaStreamingDF = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("subscribe", "test")
          .option("startingOffsets","earliest")
          .option("failOnDataLoss", "false")
          .load()
          .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")
    
    
        val sdvDF = kafkaStreamingDF
          .as[record]
          .filter($"value".isNotNull)
    
        val query = sdvDF.writeStream
              .format("console")
              .option("truncate","false")
              .start()
              .awaitTermination()
    
      }
    
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-05
      • 1970-01-01
      • 1970-01-01
      • 2020-03-19
      • 1970-01-01
      • 1970-01-01
      • 2016-02-13
      • 1970-01-01
      相关资源
      最近更新 更多