【发布时间】:2017-01-20 20:11:44
【问题描述】:
我有一个 spark 流应用程序,它使用 Spark Direct Streaming(不是接收器)方法从 Kafka 读取消息并处理每个分区的消息。
在我的 Kafka 分区中,有时我们会收到需要 20 秒才能处理 2000 条消息的消息,而某些消息需要 7-9 秒才能处理相同的编号。消息。
鉴于波动,我们开启背压设置如下。
spark.batch.duration=10 seconds
spark.streaming.kafka.maxRatePerPartition=200
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=60
spark.streaming.kafka.maxRatePerPartition=200
并且还使用以下参数指定了 RateEstimator。 我不了解 PID 的数学,但尝试了不同的组合,其中一种如下。
spark.streaming.backpressure.rateEstimator=pid
spark.streaming.backpressure.pid.minRate=1600
spark.streaming.backpressure.pid.integral=1
spark.streaming.backpressure.pid.proportional=25
spark.streaming.backpressure.pid.derived=1
最初,spark 读取 RDD 中 1 个分区的 2000 条消息,但一段时间后它开始读取 800 条记录。我认为是 minRate/2。然后它保持静止.. 在日志中,它总是打印 1600 作为新汇率。
2017-01-20 14:55:14 TRACE PIDRateEstimator:67 - New rate = 1600.0
鉴于我的情况,我有几个问题:
- 是
spark.streaming.backpressure.pid.minRate每个分区或总数。要批量读取的消息数? - 为什么要读取 800 条消息而不是 1600 条消息??
- 是否有任何建议的参数可以在处理时间较长时降低输入速率并在处理速度非常快时增加到接近 maxRatePerPartition 的值? 在我的示例中,输入速率从 2000 开始,但是当平均花费 20 秒的时间时,它会将其减少到 800,但是当在 3-4 秒内处理 800 条消息时,它并没有将其增加到 1600 或更多。这会浪费时间并降低吞吐量。
【问题讨论】:
-
我也面临同样的问题.. CPU 没有被使用,因为不知何故有异步速率限制器在处理过程中启动并减少批量大小。查看代码后,可能是这个文件github.com/apache/spark/blob/master/external/kafka-0-10/src/… 搜索“异步维护并通过接收器跟踪器向接收器发送新的速率限制”.. 如果有人在高 CPU 的生产中运行带有背压的 spark-streaming 可以回答这个问题
标签: apache-kafka spark-streaming cloudera-cdh