【问题标题】:Apache Flink - implementing a stream processor with potentially very large stateApache Flink - 实现具有潜在非常大状态的流处理器
【发布时间】:2017-03-29 16:10:39
【问题描述】:

我希望从事件流中投射出一个可能非常大的状态。这就是我可能以一种命令式的方式实现它的方式:

class ImperativeFooProcessor {

  val state: mutable.Map[UUID, BarState] = mutable.HashMap.empty[UUID, BarState]

  def handle(event: InputEvent) = {
    event match {
      case FooAdded(fooId, barId) => {
        // retrieve relevant state and do some work on it
        val barState = state(barId)

        // let the world know about what may have happened
        publish(BarOccured(fooId, barId))
        // or maybe rather
        publish(BazOccured(fooId, barId))
      }
      case FooRemoved(fooId, barId) => {
        // retrieve relevant state and do some work on it
        val barState = state(barId)

        // let the world know about what may have happened
        publish(BarOccured(fooId, barId))
        // or maybe rather
        publish(BazOccured(fooId, barId))
      }
    }
  }

  private def publish(event: OutputEvent): Unit = {
    // push event to downstream sink
  }
}

在最坏的情况下,BarState 的大小会随着 FooAdded 提及的次数而增长

相对于每个 barId 的事件总数而言,唯一 barId 的数量非常少。

我将如何开始在 Flink 中表示这种处理结构?

如何处理每个 BarState 都可能变得非常大的事实?

【问题讨论】:

    标签: scala apache-flink flink-streaming


    【解决方案1】:

    Flink 在所谓的状态后端维护状态。有状态后端(MemoryStateBackendFsStateBackend)在工作进程的 JVM 堆上运行。这些后端不适合处理大型状态。

    Flink 还有一个 RocksDBStateBackend,它基于 RocksDB。 RocksDB 在每个工作节点上用作本地数据库(无需将其设置为外部服务)并将状态数据写入磁盘。因此,它可以处理超出内存的非常大的状态。

    Flink 提供了一个KeyedStream,它是一个按特定属性划分的流。在您的情况下,您可能希望对同一 id 的所有访问都转到同一状态实例,因此您将使用 barId 作为键。然后,基于barId 在所有并行工作线程之间划分状态。这基本上是一个分布式键值存储或映射。所以你不需要将状态表示为地图,因为它是由 Flink 自动分发的。

    【讨论】:

      猜你喜欢
      • 2016-10-10
      • 1970-01-01
      • 2021-03-20
      • 1970-01-01
      • 2021-08-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多