【问题标题】:Writing Spark checkpoints to S3 is too slow将 Spark 检查点写入 S3 太慢
【发布时间】:2016-08-27 12:56:10
【问题描述】:

我正在使用 Spark Streaming 1.5.2,我正在使用 Direct Stream 方法从 Kafka 0.8.2.2 摄取数据。

我已启用检查点,以便我的驱动程序可以重新启动并从中断处继续,而不会丢失未处理的数据。

检查点写入 S3,因为我在 Amazon AWS 上,而不是在 Hadoop 集群上运行。

批处理间隔为 1 秒,因为我想要低延迟。

问题是,将单个检查点写入 S3 需要 1 到 20 秒。它们在内存中备份,最终应用程序失败。

2016-04-28 18:26:55,483 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6071 bytes and 1724 ms
2016-04-28 18:26:58,812 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6024 bytes and 3329 ms
2016-04-28 18:27:00,327 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882408000', took 6068 bytes and 1515 ms
2016-04-28 18:27:06,667 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882408000', took 6024 bytes and 6340 ms
2016-04-28 18:27:11,689 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882409000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882409000', took 6067 bytes and 5022 ms
2016-04-28 18:27:15,982 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882409000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882409000', took 6024 bytes and 4293 ms

有没有办法在不增加批处理间隔的情况下增加检查点之间的间隔?

【问题讨论】:

  • 您可以避免使用检查点,您需要将偏移量存储在 Kafka 中,以便您的执行者在完成后“仅”将偏移量提交给 kafka,并在发生故障时重新处理。
  • “将偏移量存储在 Kafka 中”是什么意思?您建议使用基于接收器的方法还是直接方法并编写特定代码来将偏移量存储在 Zookeeper 中?
  • 我还建议将偏移量存储在 Kafka(或另一个数据库中)并使用直接方法 有趣的幻灯片:slideshare.net/jjkoshy/offset-management-in-kafka
  • @AlexisSeigneurin - 不要使用 S3,它对于检查点来说太慢了。您应该尝试为检查点使用更快的数据存储,例如 DynamoDB。如果您已经在 AWS 中,您也可以考虑使用 Kinesis,它有一些很好的实用方法来处理 AWS 中的检查点存储优化。
  • 我们在 Cassandra 中使用基于时间的反向索引来保存偏移量,以便我们还可以恢复基于时间的窗口。

标签: amazon-s3 apache-spark


【解决方案1】:

是的,您可以使用 checkpointInterval 参数来实现。您可以在执行检查点时设置持续时间,如下面的文档所示。

请注意,RDD 的检查点会产生保存到可靠存储的成本。这可能会导致 RDD 获得检查点的那些批次的处理时间增加。因此,需要仔细设置检查点的间隔。在小批量(比如 1 秒)下,每批检查点可能会显着降低操作吞吐量。相反,检查点太少会导致沿袭和任务大小增加,这可能会产生不利影响。对于需要 RDD 检查点的有状态转换,默认间隔是至少 10 秒的批处理间隔的倍数。它可以通过使用 dstream.checkpoint(checkpointInterval) 来设置。通常,一个 DStream 的 5 - 10 个滑动间隔的检查点间隔是一个很好的尝试设置。

【讨论】:

  • 我看到这个间隔可以在设置 DStream 本身的检查点时设置。我认为这是除了元数据检查点之外的,但不会改变主要的检查点操作,对吧?
  • 是的,你是对的。您可以通过 2 种方式实现检查点。 1) 元数据检查点,每个批次都有检查点。 2) 为每个checkpointInterval 设置检查点的数据检查点。我虽然你在做数据检查点,但根据我的经验元数据(仅包含 sparkConf、代码和任务信息)检查点不应该花费 ~20 秒!您可以尝试升级到更好的网络!
  • 我在 AWS 上并写入 S3,所以在网络端我无能为力。
猜你喜欢
  • 2019-06-17
  • 1970-01-01
  • 2019-06-07
  • 2018-10-16
  • 2017-08-06
  • 2021-06-02
  • 1970-01-01
  • 1970-01-01
  • 2016-08-23
相关资源
最近更新 更多