【问题标题】:Spark-kafka : org.apache.kafka.common.errors.TimeoutException while writing stream from SparkSpark-kafka:从 Spark 写入流时出现 org.apache.kafka.common.errors.TimeoutException
【发布时间】:2019-02-17 01:51:26
【问题描述】:

我在从 Spark 编写有关该主题的流时遇到问题。

import org.apache.spark.sql.types._

val mySchema = StructType(Array(
  StructField("ID", IntegerType),
  StructField("ACCOUNT_NUMBER", StringType)
))

val streamingDataFrame = spark.readStream.schema(mySchema).option("delimiter",",")
                              .csv("file:///opt/files")


streamingDataFrame.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
                  .writeStream.format("kafka")
                  .option("topic", "testing")
                  .option("kafka.bootstrap.servers", "10.55.55.55:9092")
                  .option("checkpointLocation", "file:///opt/")
                  .start().awaitTermination()

错误:

 2018-09-12 11:09:04,344 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time
2018-09-12 11:09:04,358 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):                                            org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time
2018-09-12 11:09:04,359 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
2018-09-12 11:09:04,370 ERROR streaming.StreamExecution: Query [id = 866e4416-138a-42b6-82fd-04b6ee1aa638, runId = 4dd10740-29dd-4275-97e2-a43104d71cf5] terminated with error
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):   org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

我的 sbt 详细信息:

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.2.0"

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0.0"

但是当我使用bin/kafka-console-producer.shbin/kafka-console-consumer.sh 通过服务器发送消息时,我可以发送和接收消息

【问题讨论】:

  • Matthias J. Sax,你解决了这个问题吗?是否可能需要将 kafka-client 版本升级到 1.0.0?

标签: apache-spark apache-kafka streaming


【解决方案1】:

您需要在客户端增加request.timeout.ms 的值。

Kafka 将记录分组以增加吞吐量。当一个新的记录添加到批次中时,它必须在时限内发送。 request.timeout.ms 是一个可配置的参数(默认值为 30 秒),用于控制此时间限制。

当一个批次的排队时间较长时,TimeoutException 将被抛出,并且记录将从队列中删除(因此不会传递消息)。

【讨论】:

  • 需要在客户端或者kafka服务器配置中添加这个参数
  • @aravinth 在客户端。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-03-12
  • 2019-07-12
  • 2018-11-27
  • 2018-11-29
  • 2020-03-19
  • 2015-11-19
  • 2020-03-28
相关资源
最近更新 更多