【问题标题】:Kafka Streams / How to get the partition an iterartor is iterating over?Kafka Streams / 如何获取迭代器正在迭代的分区?
【发布时间】:2021-02-03 16:54:49
【问题描述】:

在我的 Kafka Streams 应用程序中,我有一个任务是设置一个计划的(按挂钟时间)标点符号。标点符号遍历商店的条目并对其进行处理。像这样:

var store = context().getStateStore("MyStore");
var iter = store.all();

while (iter.hasNext()) {
   var entry = iter.next();
   // ... do something with the entry
}

// Print a summary (now): N entries processed
// Print a summary (wish): N entries processed in partition P

由于我在这里使用单个存储(可能已分区),因此我假设标点符号的每次执行都绑定到该存储的单个分区。

是否有可能找出标点符号在哪个分区上运行? ProcessorContext.partition() 的 java 文档指出此方法在标点符号内返回 -1

我已经阅读了Kafka Streams: Punctuate vs Process 以及那里的答案。我可以理解,一般来说,任务不依赖于特定的分区。但是迭代器应该与 IMO 绑定。

如何找到分区?

或者我假设存储迭代器的特定实例与分区相关联是错误的?

我需要它做什么:我想在一些日志消息中包含分区号。现在,我有几条几乎相同的日志消息,说明标点符号做了这个和那个。为了使这些消息“唯一”,我想在其中包含分区号。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams punctuator


    【解决方案1】:

    只是在这里发布https://issues.apache.org/jira/browse/KAFKA-12328中提供的答案:

    我刚刚使用了context.taskId()。它在值的末尾包含下划线之后的分区号。这对我来说已经足够了。

    【讨论】:

      猜你喜欢
      • 2019-03-17
      • 1970-01-01
      • 2020-01-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多