【问题标题】:Process 2 streams in sync using Flink使用 Flink 同步处理 2 个流
【发布时间】:2021-01-21 00:11:17
【问题描述】:

我有 2 个流 A 和 B。

我开始吃 A 和 B。

流 A 仅在每分钟的第 59 秒获得记录。

流 B 在每分钟的任何一秒都有记录。

我想处理使两个流同步。

示例:在 10:01:59 之后从 Stream A 我将在 10:02:59 收到一条记录,直到 10:02:59 我也不想从 Stream B 读取任何内容。

这可以在 Flink 中实现吗?

【问题讨论】:

    标签: java scala apache-flink flink-streaming amazon-kinesis


    【解决方案1】:

    在 Flink 中,您不能不从流中读取记录,但可以从流中删除(或保存状态)记录。因此,您可以连接两个流,并使用 CoFlatMap 进行处理。当您从 Stream A 获取记录时,将其保存在 state 中。当您从 Stream B 获取记录时,根据 Stream A 的状态决定如何处理它。

    【讨论】:

    • 但是我如何确保状态中的值是最新的。我在 10:01:59 收到 Stream A 的记录,我也想看看 Stream B 在 10:01:59 之前的最新记录是什么
    【解决方案2】:

    Flink 使用基于推送的模型(这应该会随着源和接收器重构为基于拉的模型而立即改变)来处理下游元素。这意味着您不能“等到事件到达才能提取更多数据”,并且您必须同时在某些操作员状态下对其进行缓冲。 Flink 提供various state backends 供您使用。

    为了对kkruglers answer 进行一些可视化,给定两个流,这就是我们如何在逻辑上连接它们,然后使用ListState 在另一个元素到达时检索其中一个:

    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    import scala.collection.JavaConverters._ 
    
    object Test {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.createLocalEnvironment()
        val streamA = env.fromCollection(List(1, 2, 3))
        val streamB = env.fromCollection(List("a", "b", "c"))
    
        streamA
          .connect(streamB)
          .process {
            new CoProcessFunction[Int, String, (Int, String)] {
              var myStateA: ListState[Int] = _
    
              override def open(parameters: Configuration): Unit = {
                myStateA = getRuntimeContext.getListState[Int](
                  new ListStateDescriptor[Int]("my_state", classOf[Int])
                )
              }
    
              override def processElement1(
                  value: Int,
                  ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
                  out: Collector[(Int, String)]
              ): Unit = {
                myStateA.add(value)
              }
    
              override def processElement2(
                  value: String,
                  ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
                  out: Collector[(Int, String)]
              ): Unit = {
                val list = myStateA.get().iterator().asScala.toList
                val intFromState = list.headOption
                intFromState match {
                  case Some(myInt) =>
                    out.collect((myInt, value))
                  case None => ()
                }
    
                myStateA.update(list.tail.asJava)
              }
            }
          }
      }
    }
    

    请注意,此实现已简化。这里不能保证元素的到达顺序,您需要将其添加到您的状态和实现中。您还可以使用Timers,从而为每个进入流的事件注册一个计时器,以指示新数据何时到达。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-01
      • 1970-01-01
      • 1970-01-01
      • 2021-07-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-03-26
      相关资源
      最近更新 更多