【问题标题】:Filter partial duplicates with mapWithState Spark Streaming使用 mapWithState Spark Streaming 过滤部分重复项
【发布时间】:2018-10-12 18:36:06
【问题描述】:

我们有一个DStream,比如

val ssc = new StreamingContext(sc, Seconds(1))

val kS = KafkaUtils.createDirectStream[String, TMapRecord](
  ssc,
  PreferConsistent,
  Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
  mapPartitions(part => {
    part.map(_.value())
  }).
  mapPartitions(part1 => {
    part1.map(c => {
      TMsg(1,
        c.field1,
        c.field2, //And others
        c.startTimeSeconds
      )
    })
  })

所以每个 RDD 都有一堆 TMsg 对象,其中包含一些(技术)关键字段,我可以使用这些字段来去除 DStream。基本上,如果我们在一个或两个离散 RDD 中有两个 TMsg 对象,它们具有相同的 field1field2,并且它们相差不到 1 秒(我们查看 startTimeSeconds),这是一个重复

我查看了 mapWithState。 是的,我可以创建 K -> V DStream 之类的

val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)

所以我可以使用该功能,但不明白如何使用它来过滤重复项。

窗口函数没用,我不能使用(结构化流).deduplicate 函数,因为解决方案是用 DStreams 编写的。

有什么解决办法吗?谢谢

附: Spark 版本是 2.2

【问题讨论】:

    标签: scala apache-spark streaming bigdata spark-streaming


    【解决方案1】:

    您可以使用mapWithState。有一个很好的manual how to use Stateful Streaming。 在您的情况下,您可以:

    1.设置检查点:

    val ssc = new StreamingContext(sc, Seconds(1))
    ssc.checkpoint("path/to/persistent/storage")
    

    2.定义更新函数:

    def update(key: (String, String),
               value: Option[Int],
               state: State[Int]): Option[((String, String), Int)] = {
      (value, state.getOption()) match {
        case (Some(_), Some(_)) => None
        case (Some(v), _) =>
          # you can update your state in any value you want
          # it is just a marker that value not new
          state.update(value.get)
          Option((key, v))
        case (_, _) if state.isTimingOut() => None
      }
    }
    

    3.制定状态规范:

    val stateSpec =
      StateSpec
        .function(update _)
        # it is important to define how long 
        # you want to check duplication
        # in this example check interval is 1 second.
        .timeout(Seconds(1))
    

    4.使用它:

    ks
      # make key->value pairs
      .map(m => (m.field1, m.field2) -> m.startTimeSeconds)
      .mapWithState(stateSpec)
    

    如果你想取最后一个值,更新函数可能是:

      def update(key: (String, String),
                           value: Option[Int],
                           state: State[Int]): Option[((String, String), Int)] = {
        (value, state.getOption()) match {
          case (Some(_), Some(_)) => None
          case (Some(v), _) =>
            state.update(value.get)
            None
          case (_, _) if state.isTimingOut() => Option((key, value.get))
        }
      }
    

    【讨论】:

    • 谢谢,您的解决方案运行良好,虽然我之前做了几乎相同的事情。
    猜你喜欢
    • 1970-01-01
    • 2016-07-02
    • 2016-12-24
    • 1970-01-01
    • 2017-10-27
    • 1970-01-01
    • 2016-12-13
    • 2015-09-12
    • 1970-01-01
    相关资源
    最近更新 更多