【问题标题】:Does Merging two kafka-streams preserve co-partitioning合并两个卡夫卡流是否保留共同分区
【发布时间】:2020-01-20 09:24:26
【问题描述】:

我有 2 个共同划分的 kafka 主题。一个包含自动生成的数据,另一个包含手动覆盖。

我想合并它们并过滤掉任何已经手动覆盖的自动生成的数据,然后将所有内容转发到组合的 Log Compacted 主题。

为此,我从每个主题创建一个流,并使用 dsl API merge the streams

然后我应用以下转换,它存储任何手动数据,并删除任何已经手动覆盖的自动数据:(Scala 但如果你知道 java 应该很容易理解)

class FilterManuallyClassifiedTransformer(manualOverridesStoreName : String)
  extends Transformer[Long, Data, KeyValue[Long, Data]] {

  // Array[Byte] used as dummy value since we don't use the value
  var store: KeyValueStore[Long, Array[Byte]] = _

  override def init(context: ProcessorContext): Unit = {
    store = context.getStateStore(manualOverridesStoreName ).asInstanceOf[KeyValueStore[Long, Array[Byte]]]
  }

  override def close(): Unit = {}

  override def transform(key: Long, value: Data): KeyValue[Long, Data] = {
    if (value.getIsManual) {
      store.put(key, Array.emptyByteArray)
      new KeyValue(key, value)
    }
    else if (store.get(key) == null) {
      new KeyValue(key, value)
    }
    else {
      null
    }
  }
}

如果我理解正确,除非手动数据和具有相同键的自动数据在同一个分区中,否则无法保证这将起作用。否则,手动覆盖可能存储在与自动数据检查的状态存储不同的状态存储中。

对吗?

如果是这样,merge 会保留我需要的共同分区保证吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    如果两个输入主题具有相同数量的分区并使用相同的分区策略,merge() 将保留共同分区。

    比较:

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-07-12
      • 2018-07-06
      • 1970-01-01
      • 2018-01-15
      • 2021-03-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多