【问题标题】:Spark streaming: Cache DStream results across batchesSpark 流式传输:跨批次缓存 DStream 结果
【发布时间】:2016-09-29 17:21:23
【问题描述】:

使用 Spark 流式传输 (1.6) 我有一个文件流,用于读取批量大小为 2 秒的查找数据,但是文件仅每小时复制到目录中。
一旦有一个新文件,它的内容就会被流读取,这就是我想要缓存到内存中并保留在那里的内容 直到读取新文件。
我想将这个数据集加入到另一个流中,因此我想缓存。

这是Batch lookup data for Spark streaming的后续问题。
答案确实适用于updateStateByKey,但是我不知道如何处理 KV 对的情况 从查找文件中删除,因为updateStateByKey 中的值序列不断增长。 此外,任何提示如何使用 mapWithState 来做到这一点都会很棒。

这是我到目前为止尝试过的,但数据似乎没有被持久化:

val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x => 
  if (!x.partitions.isEmpty) {
    x.unpersist(true)
    x.persist()
  }
}

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    DStreams 可以直接使用 persist 方法持久化,该方法持久化流中的每个 RDD:

    dictionaryStream.persist
    

    根据the official documentation这个自动申请

    reduceByWindowreduceByKeyAndWindow 等基于窗口的操作以及updateStateByKey 等基于状态的操作

    所以在你的情况下应该不需要显式缓存。也不需要手动取消持久化。再次引用the docs

    默认情况下,DStream 转换生成的所有输入数据和持久化 RDD 都会自动清除

    并根据管道中使用的转换自动调整保留期。

    关于mapWithState,您必须提供StateSpec。一个最小的示例需要一个函数,该函数采用当前value 和先前状态的keyOption。假设您有DStream[(String, Long)],并且您想记录迄今为止的最大值:

    val state = StateSpec.function(
      (key: String, current: Option[Double], state: State[Double]) => {
        val max  = Math.max(
          current.getOrElse(Double.MinValue),
          state.getOption.getOrElse(Double.MinValue)
        )
        state.update(max)
        (key, max)
      }
    )
    
    val inputStream: DStream[(String, Double)] = ??? 
    inputStream.mapWithState(state).print()
    

    还可以提供初始状态、超时间隔和捕获当前批处理时间。最后两个可以用来对一段时间没有更新的keys实现删除策略。

    【讨论】:

    • 再次感谢!我只想在fileStream 收到一些数据后实现这一点,缓存它并使用它,直到下一次它会收到一些东西。坚持我无法做到这一点。如果我按照您在上一个问题中的建议将current.headOption.orElse(prev) 放入updateStateByKey,那么它将缓存该元素,但我无法判断current 是否为空,因为在给定的批次中没有读取新数据fileStream 或为 null,因为该值同时被删除。
    • persist / unpersist 不会影响您看到的数据。这只是一个性能提示。仅仅因为您unpersist 观察到的数据不会改变。 mapWithState中的timeout可以用来忘记状态但是有点别扭。
    • 谢谢,我明白了。我对这个问题的另一个想法是以某种方式定期广播文件流数据并从 mainStream 进行查找。但是,我不知道新数据在一致性方面(重新)广播到工作节点的时间间隔内会发生什么。对于这个问题,你有什么建议?
    • 我不认为这是一个不错的选择。我能想到的唯一方法是以相对可靠的方式进行操作,即在每批上使用 transform / foreachRDD 闭包和重播。而且它仍然不能在恢复的情况下提供确定性。
    • 您也可以尝试使用文件系统传递内容并通过对象访问数据,这些对象会以与this 略微相似的方式进行更新。
    猜你喜欢
    • 1970-01-01
    • 2015-11-10
    • 2018-05-27
    • 2020-07-17
    • 2016-10-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多