【发布时间】:2021-12-21 02:44:42
【问题描述】:
我正在编写一个示例火花流程序来读取来自输入 kafka 主题的消息并将其写入控制台和另一个输出 kafka 主题。没有收到任何错误或异常,但我也没有在控制台和输出 kafka 主题中看到消息。谁能告诉我我在哪里/我错过了什么。
这是我的代码。
object Test extends App {
val logger: Logger = LoggerFactory.getLogger(Test.getClass)
val kafkalogger = org.apache.log4j.Logger.getLogger("kafka")
kafkalogger.info("Running Pipeline")
kafkalogger.setLevel(Level.INFO);
val spark = SparkSession.builder().getOrCreate()
val dfStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "*.*.*.*:9092")
.option("subscribe", "input-topic")
.option("startingOffsets", "earliest")
.load()
dfStream.printSchema()
val messageDF = dfStream.selectExpr("CAST(value AS STRING)")
messageDF.printSchema()
kafkalogger.info("Before writing messages to console")
messageDF.writeStream.outputMode("append").format("console").start()
kafkalogger.info("After writing to console")
val writeToKafka = dfStream
//.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.selectExpr("CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "*.*.*.*:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "/tmp/jsp_checkpointdir")
.start()
kafkalogger.info("After writing to topic")
writeToKafka.awaitTermination()
}
我可以看到数据框的架构,但看不到实际的消息。
21/11/08 07:02:00 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b99c937{/static/sql,null,AVAILABLE,@Spark}
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- value: string (nullable = true)
21/11/08 07:02:03 INFO kafka: Before writing messages to console
21/11/08 07:02:03 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 512, script: , vendor: , cores -> name: cores, amount: 2, script: , vendor: , memory -> name: memory, amount: 4096, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/11/08 07:02:03 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40fd1a78{/StreamingQuery,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@32430075{/StreamingQuery/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@60f1f95b{/StreamingQuery/statistics,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6f926d01{/StreamingQuery/statistics/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@10e4ce98{/static/sql,null,AVAILABLE,@Spark}
21/11/08 07:02:03 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f.
21/11/08 07:02:03 INFO CheckpointFileManager: Writing atomically to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata using temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp
21/11/08 07:02:03 INFO CheckpointFileManager: Renamed temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3393cbc3-9899-4053-8cb6-8c7654d8db28, runId = 23729550-4fc3-4500-8fcf-fa76da0ed3cf]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to console
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /tmp/jsp_checkpointdir resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir.
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3ae88b1d-c4bb-4a7c-be5c-4a75fda4903d, runId = 0d089f20-c110-465b-add9-080e213dd72d]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to topic
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO ConsumerConfig: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
这些是输入 kafka 主题中的消息
[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic --from-beginning
test message
message2
message3
message4
message4
message5
message6
输出主题中也没有消息。
[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-topic --from-beginning
【问题讨论】:
-
如果您的目标只是读取和写入同一个集群,您应该只使用 Kafka Streams
-
嗨 OneCricketeer,我们已经在使用 spark,所以使用 spark 流。
-
但我猜你也在其他地方部署 Java/Scala 应用程序,那为什么不使用它呢?
-
连接问题已解决,我在 server.properties 文件的侦听器中使用 localhost 而不是 IP。但是,仍然存在实际问题。我在控制台和输出主题中都看不到消息。
-
你应该使用
0.0.0.0作为listeners而不是任何特定的IP
标签: scala apache-spark apache-kafka spark-structured-streaming