【问题标题】:Apache flink: maintaining message input order in streams with keyBy/connectApache flink:使用 keyBy/connect 维护流中的消息输入顺序
【发布时间】:2019-05-19 22:16:47
【问题描述】:

简介

我正在使用 apache flink 构建一个相当复杂的数据流网络。这个想法是,用flink实现一个规则引擎

作为应用程序的基本描述,它应该是这样工作的:

数据由 kafka consumer source 接收,并与多个数据流一起处理,直到最终发送到 kafka producer sink。传入数据包含具有逻辑键(“object-id”)的对象,传入消息可能引用相同的 object-id。对于每个给定的 object-id,其传入消息的顺序必须在整个应用程序中保留。整个消息的顺序可以是任意的。

这意味着object1的消息abc必须按顺序处理,但是消息x object2 的 em> 可能在 a1/b1/c1 之间处理,之前或之后,没关系。

就我目前的理解而言,这意味着我必须keyBy(_.objectID),以便同一对象的消息按照它们到达的顺序进行处理。

目前的做法

为了实现实际的规则引擎,创建了一个流网络。思路如下:

  • 每条规则都有 1-n 个条件
  • 为每个规则的每个条件创建一个带有.filter(_.matches(rule.condition)) 的原始流的子流
  • 使用substream1.connect(substream2).flatMap(new CombineFunction[MyObject](...))合并所有对应相同规则的子流
  • connect 只能加入 2 个流,所以 3 个条件的规则会导致后续的 2 个加入
  • 使用相同条件的规则将重复使用在第二步中创建的相同子流。

这将导致 n 个加入流,其中 n 对应于规则的数量。加入的流将附加一个map 函数,用于标记消息,以便我们知道匹配的规则。

每个加入/结果流可能会独立于其他结果将其结果(“规则 xyz 匹配”)发布到 kafka 生产者,因此此时我可以将接收器附加到流。

连接详情

因为两个流(“条件”-子流)的.connect 必须只传递一条消息,如果在两个流上都接收到它(^= 两个条件都匹配),我需要一个带键状态的RichCoFlatMapFunction,它可以处理“只有在对方已经收到它的情况下才能通过”。

然而,问题是,流是由 object-id 键控的。那么,如果同一对象的 2 条消息通过网络运行并到达.connect().map(new RichCoFlatMapFunction...),会发生什么情况?这将导致错误的输出。 在进入网络时,我需要为每条传入消息分配一个唯一 ID (UUID),因此我可以在 .connect().map().. 连接中使用此密钥(而不是对象 ID)。 但同时,我需要通过 object-id 对流进行键控,以便按顺序处理相同对象的消息。怎么办?

为了解决这个问题,我将输入流保留为keyBy(_.objectID),但流连接中的RichCoFlatMapFunction 不再使用键控状态。相反,我使用了一个简单的操作符状态,它保留了传递对象的映射,但实现了相同的逻辑,只是通过手动键/值查找。

似乎可以工作,但是我不知道这是否会引入更多问题。

可视化

flink GUI 会渲染这个图片,14 条规则的列表,总共 23 个条件(有些规则只有一个条件):

代码

使用以下代码实现网络的创建:

val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()

if (rules.isEmpty)
  return

// create partial streams for all conditions (first level)
// cache the sub-stream with the hashcode of its condition as key (for re-use)

for (rule <- rules if rule.checks.nonEmpty ;
     cond <- rule.checks if !streamCache.contains(cond.hashCode()))
  streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)

// create joined streams for combined conditions (sub-levels)

for (rule <- rules if rule.checks.nonEmpty)
{
  val ruleName = rule.ruleID

  // for each rule, starting with the rule with the least conditions ...

  if (rule.checks.size == 1)
  {
    // ... create exit node if single-condition rule
    // each exit node applies the rule-name to the objects set of matched rules.

    outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
  }
  else
  {
    // ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths)

    var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
    var idString = rule.checks.head.idString

    for (i <- rule.checks.indices)
    {
      if (i == rule.checks.size-1)
      {
        // reached last condition of rule, create exit-node
        // each exit node applies the rule-name to the objects set of matched rules.

        val rn = ruleName
        val objectType = rule.objectType.mkString(":")
        val statement = rule.statement

        outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj })
      }
      else
      {
        // intermediate condition, create normal intermediate node

        val there = rule.checks(i+1)
        val connectStream = streamCache(there.hashCode)

        idString += (":" + there.idString)

        // try to re-use existing tree-segments

        if (streamCache.contains(idString.hashCode))
          sourceStream = streamCache(idString.hashCode)
        else
          sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))
      }
    }
  }
}

// connect each output-node to the sink

for (stream <- outputNodesCache)
{
  stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)
}

之前sn-p中使用的StatefulCombineFunction

class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction
{
  @transient
  private var leftState:ListState[(String, WorkingMemory)] = _
  private var rightState:ListState[(String, WorkingMemory)] = _
  private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
  private var bufferedRight = ListBuffer[(String, WorkingMemory)]()

  override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left")
  override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right")

  def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit =
  {
    val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)

    if (otherIdx > -1)
    {
      out.collect(leftState(otherIdx)._2)
      leftState.remove(otherIdx)
    }
    else
    {
      rightState += ((xmlObject.uuid, xmlObject))
    }
  }

  override def initializeState(context:FunctionInitializationContext): Unit = ???
  override def snapshotState(context:FunctionSnapshotContext):Unit = ???
}

我知道从运营商状态中清除部分匹配是缺失的(生存时间),但这对于当前的开发状态并不重要,稍后会添加。

背景资料

此应用程序将使用 flink (https://en.wikipedia.org/wiki/Rete_algorithm) 实现规则匹配的 rete-algorithm。

另一种方法是为每条传入消息循环所有规则,并附加结果。我有一个使用 flink 的方法的工作实现,所以请不要将此作为解决方案。

问题

问题在于,应用程序在对象 ID 级别上弄乱了传入消息的顺序。也就是说,它没有达到我在介绍中所要求的。对于每个 object-id,传入的消息必须保持顺序。但事实并非如此。

我不知道代码中的哪一点顺序被弄乱了,也不知道这些操作是如何在线程之间分布的,所以我不知道如何解决这个问题。

【问题讨论】:

  • 据我所知,没有办法在连接运算符中保持元素的顺序。但是您能否介绍更多有关您的应用业务的信息,以便我们可以从其他角度解决这个问题?
  • 我修改了最初的帖子,使其更加详细/清晰。希望这会有所帮助。

标签: scala join stream apache-flink rule-engine


【解决方案1】:

几厘米……

  1. 我假设你已经检查了 Flink 的 CEP 支持,特别是 Handling Lateness in Event Time。关键概念是您可以依靠事件时间(而不是处理时间)来帮助对事件进行排序,但您始终必须决定您愿意容忍的最大延迟量(延迟可能是由两者引起的)源,以及工作流中发生的任何处理)。
  2. 从您提供的 Flink 作业图中,您似乎正在通过哈希对传入数据进行分区,但每个规则都需要获取每个传入数据,对吗?所以在这种情况下你需要广播。

【讨论】:

  • 感谢您的评论。 to 1):我将环境的时间特性设置为IngestionTime,因为事件接收的时间会相关。也就是说,从应用程序的角度来看,没有迟到需要处理。但是对于每个传入的消息,同一对象的消息不能比第一条消息早于应用程序存在。 To 2): 你能详细说明一下吗?如果我做.keyBy().filter,那么所有通过流的消息都会到达过滤器?或者换句话说,过滤器存在于流的所有分区上?
  • 一旦您执行.keyBy(),您就有了一个密钥流。这意味着当您执行.filter() 时,您可以保证每条具有与keyBy() 相同的键的记录都将转到.filter() 运算符的同一实例,但并非每条记录都会转到每个运算符(除非您的并行度为 1)。有关更多详细信息,请参阅 Flink 关于广播支持的文档。
  • 感谢您的评论。老实说,我找不到有关此的详细/完整信息,广播状态文档和键控流文档(基本上不存在)都没有帮助。看着ci.apache.org/projects/flink/flink-docs-release-1.6/concepts/…,我不明白流的分区如何影响运算符的使用。您是否有相关文档的链接?据我了解,分区是“下方”运算符的一层,因此分区流不能“踢出”运算符?
  • 我不确定您所说的“踢出”运算符是什么意思。但是,一旦您执行了 keyBy(),您就会得到共享相同键的逻辑记录组。您可以保证具有相同键的所有记录都将转到运算符的同一实例。如果您的操作员的并行度为 1,那么显然所有组(以及所有记录)都将转到同一个(单个)实例。但如果并行度 > 1,那么组将被分区(发送)到该运算符的不同实例。 HTH。
  • 好的,但为什么会出现问题?除非他们随机转到同一运算符的不同实例(这会以某种方式破坏keyBy 的目的),否则一切都会顺利进行?
猜你喜欢
  • 2020-12-24
  • 1970-01-01
  • 1970-01-01
  • 2011-04-09
  • 1970-01-01
  • 2019-05-26
  • 2017-12-05
  • 2013-08-01
  • 1970-01-01
相关资源
最近更新 更多