【发布时间】:2016-08-27 12:56:10
【问题描述】:
我正在使用 Spark Streaming 1.5.2,我正在使用 Direct Stream 方法从 Kafka 0.8.2.2 摄取数据。
我已启用检查点,以便我的驱动程序可以重新启动并从中断处继续,而不会丢失未处理的数据。
检查点写入 S3,因为我在 Amazon AWS 上,而不是在 Hadoop 集群上运行。
批处理间隔为 1 秒,因为我想要低延迟。
问题是,将单个检查点写入 S3 需要 1 到 20 秒。它们在内存中备份,最终应用程序失败。
2016-04-28 18:26:55,483 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6071 bytes and 1724 ms
2016-04-28 18:26:58,812 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6024 bytes and 3329 ms
2016-04-28 18:27:00,327 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882408000', took 6068 bytes and 1515 ms
2016-04-28 18:27:06,667 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882408000', took 6024 bytes and 6340 ms
2016-04-28 18:27:11,689 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882409000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882409000', took 6067 bytes and 5022 ms
2016-04-28 18:27:15,982 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882409000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882409000', took 6024 bytes and 4293 ms
有没有办法在不增加批处理间隔的情况下增加检查点之间的间隔?
【问题讨论】:
-
您可以避免使用检查点,您需要将偏移量存储在 Kafka 中,以便您的执行者在完成后“仅”将偏移量提交给 kafka,并在发生故障时重新处理。
-
“将偏移量存储在 Kafka 中”是什么意思?您建议使用基于接收器的方法还是直接方法并编写特定代码来将偏移量存储在 Zookeeper 中?
-
我还建议将偏移量存储在 Kafka(或另一个数据库中)并使用直接方法 有趣的幻灯片:slideshare.net/jjkoshy/offset-management-in-kafka
-
@AlexisSeigneurin - 不要使用 S3,它对于检查点来说太慢了。您应该尝试为检查点使用更快的数据存储,例如 DynamoDB。如果您已经在 AWS 中,您也可以考虑使用 Kinesis,它有一些很好的实用方法来处理 AWS 中的检查点存储优化。
-
我们在 Cassandra 中使用基于时间的反向索引来保存偏移量,以便我们还可以恢复基于时间的窗口。