【问题标题】:Can't Read from and write to kafka topic using spark scala无法使用 spark scala 读取和写入 kafka 主题
【发布时间】:2021-12-21 02:44:42
【问题描述】:

我正在编写一个示例火花流程序来读取来自输入 kafka 主题的消息并将其写入控制台和另一个输出 kafka 主题。没有收到任何错误或异常,但我也没有在控制台和输出 kafka 主题中看到消息。谁能告诉我我在哪里/我错过了什么。

这是我的代码。


    object Test extends App {
      val logger: Logger = LoggerFactory.getLogger(Test.getClass)
      val kafkalogger = org.apache.log4j.Logger.getLogger("kafka")
      kafkalogger.info("Running Pipeline")
      kafkalogger.setLevel(Level.INFO);
      val spark = SparkSession.builder().getOrCreate()
      
      val dfStream = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "*.*.*.*:9092")
          .option("subscribe", "input-topic")
          .option("startingOffsets", "earliest")
          .load()
    
        dfStream.printSchema()
    
        val messageDF = dfStream.selectExpr("CAST(value AS STRING)")
        
        messageDF.printSchema()
        
        kafkalogger.info("Before writing messages to console")
        
        messageDF.writeStream.outputMode("append").format("console").start()
        
        kafkalogger.info("After writing to console")
         val writeToKafka = dfStream
          //.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
         .selectExpr("CAST(value AS STRING)")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "*.*.*.*:9092")
          .option("topic", "output-topic")
          .option("checkpointLocation", "/tmp/jsp_checkpointdir")
          .start()
        kafkalogger.info("After writing to topic")
        writeToKafka.awaitTermination()
    }

我可以看到数据框的架构,但看不到实际的消息。

21/11/08 07:02:00 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b99c937{/static/sql,null,AVAILABLE,@Spark}
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- value: string (nullable = true)

21/11/08 07:02:03 INFO kafka: Before writing messages to console
21/11/08 07:02:03 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 512, script: , vendor: , cores -> name: cores, amount: 2, script: , vendor: , memory -> name: memory, amount: 4096, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/11/08 07:02:03 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40fd1a78{/StreamingQuery,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@32430075{/StreamingQuery/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@60f1f95b{/StreamingQuery/statistics,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6f926d01{/StreamingQuery/statistics/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@10e4ce98{/static/sql,null,AVAILABLE,@Spark}
21/11/08 07:02:03 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f.
21/11/08 07:02:03 INFO CheckpointFileManager: Writing atomically to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata using temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp
21/11/08 07:02:03 INFO CheckpointFileManager: Renamed temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3393cbc3-9899-4053-8cb6-8c7654d8db28, runId = 23729550-4fc3-4500-8fcf-fa76da0ed3cf]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to console
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /tmp/jsp_checkpointdir resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir.
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3ae88b1d-c4bb-4a7c-be5c-4a75fda4903d, runId = 0d089f20-c110-465b-add9-080e213dd72d]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to topic
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO ConsumerConfig: ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest

这些是输入 kafka 主题中的消息

[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic --from-beginning
test message
message2
message3
message4
message4
message5
message6

输出主题中也没有消息。

[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-topic --from-beginning

【问题讨论】:

  • 如果您的目标只是读取和写入同一个集群,您应该只使用 Kafka Streams
  • 嗨 OneCricketeer,我们已经在使用 spark,所以使用 spark 流。
  • 但我猜你也在其他地方部署 Java/Scala 应用程序,那为什么不使用它呢?
  • 连接问题已解决,我在 server.properties 文件的侦听器中使用 localhost 而不是 IP。但是,仍然存在实际问题。我在控制台和输出主题中都看不到消息。
  • 你应该使用0.0.0.0作为listeners而不是任何特定的IP

标签: scala apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

我猜你有一些网络问题需要解决,但以下对我在本地有效(使用 Spark 2.4.8 测试)。

import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.slf4j.LoggerFactory

object KafkaTest extends App {

  val logger = LoggerFactory.getLogger(getClass)

  /**
   * For testing output to a console.
   *
   * @param df A Streaming DataFrame
   * @return A DataStreamWriter
   */
  private def streamToConsole(df: sql.DataFrame) = {
    df.writeStream.outputMode(OutputMode.Append()).format("console")
  }

  private def getKafkaDf(spark: SparkSession, bootstrap: String, topicPattern: String, offsetResetStrategy: OffsetResetStrategy = OffsetResetStrategy.EARLIEST) = {
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("subscribe", topicPattern)
      .option("startingOffsets", offsetResetStrategy.toString.toLowerCase())
      .load()
  }

  private def streamToKafka(df: sql.DataFrame, bootstrap: String, checkpointLocation: String) = {
    val cols = df.columns
    if (!cols.contains("topic")) {
      val e = new IllegalArgumentException(s"""Dataframe columns must contain 'topic'. Existing cols=[${cols.mkString(", ")}]""")
      logger.error("Unable to stream dataframe to Kafka", e)
      throw e
    }
    // output topic comes from dataframe, not from options
    df.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("checkpointLocation", checkpointLocation) // required option
  }

  val spark = SparkSession.builder()
    .appName("Kafka Test")
    .master("local[*]")
    .getOrCreate()
  import spark.implicits._
  
  val kafkaBootstrap = "localhost:9092"

  val df = getKafkaDf(spark, kafkaBootstrap, "input-topic")
  streamToKafka(
    // Stream topic values into the same partition from input-topic to output-topic
    df.select($"partition", $"value", regexp_replace($"topic", "^input", "output").as("topic")),
    kafkaBootstrap,
    checkpointLocation = "/tmp/spark-sql-kafka"
  ).start().awaitTermination()
}

【讨论】:

  • 感谢 OneCricketeer 的回复。但是,我的案例 spark 和 kafka 运行在两个不同的 EC2 实例上。我可以从 EMR 边缘节点 ping/telnet kafka publicIP/hostname。
  • 应该如此。就像我说的那样,您遇到了网络问题。从技术上讲,我的 Kafka 代理位于不同的网络 (Docker) 中,但它是端口转发的。如果 Spark 无法读取/写入,那么来自 EC2 Spark/YARN 节点的 Kafka CLI 工具也不应该
【解决方案2】:

在将配置更改恢复到 config/server.properties 文件并将其保留为默认配置后,此问题得到解决。如果消息在 Spark 流式传输作业启动之前进入主题,我将无法从主题的开头消费。这是不同的问题。

【讨论】:

    猜你喜欢
    • 2021-10-16
    • 2021-06-11
    • 2021-09-15
    • 2020-09-18
    • 2020-09-26
    • 2020-07-02
    • 1970-01-01
    • 2018-01-09
    • 2019-07-12
    相关资源
    最近更新 更多