Flink 使用基于推送的模型(这应该会随着源和接收器重构为基于拉的模型而立即改变)来处理下游元素。这意味着您不能“等到事件到达才能提取更多数据”,并且您必须同时在某些操作员状态下对其进行缓冲。 Flink 提供various state backends 供您使用。
为了对kkruglers answer 进行一些可视化,给定两个流,这就是我们如何在逻辑上连接它们,然后使用ListState 在另一个元素到达时检索其中一个:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
object Test {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val streamA = env.fromCollection(List(1, 2, 3))
val streamB = env.fromCollection(List("a", "b", "c"))
streamA
.connect(streamB)
.process {
new CoProcessFunction[Int, String, (Int, String)] {
var myStateA: ListState[Int] = _
override def open(parameters: Configuration): Unit = {
myStateA = getRuntimeContext.getListState[Int](
new ListStateDescriptor[Int]("my_state", classOf[Int])
)
}
override def processElement1(
value: Int,
ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
out: Collector[(Int, String)]
): Unit = {
myStateA.add(value)
}
override def processElement2(
value: String,
ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
out: Collector[(Int, String)]
): Unit = {
val list = myStateA.get().iterator().asScala.toList
val intFromState = list.headOption
intFromState match {
case Some(myInt) =>
out.collect((myInt, value))
case None => ()
}
myStateA.update(list.tail.asJava)
}
}
}
}
}
请注意,此实现已简化。这里不能保证元素的到达顺序,您需要将其添加到您的状态和实现中。您还可以使用Timers,从而为每个进入流的事件注册一个计时器,以指示新数据何时到达。