【发布时间】:2020-01-28 19:09:48
【问题描述】:
我有一个卡夫卡话题。
消息被放到主题上1个月后,我必须采取行动。
为此,我循环执行以下操作:
- 投票卡夫卡
- 处理一个多月前发布的所有消息
- 提交最新此类消息的偏移量+1
- 重复
这实际上不起作用,因为 poll 从它停止的地方返回消息,并忽略提交,除非发生重新平衡。
所以我必须缓冲未读消息。 但是我仍然必须调用 poll,否则 kafka 会假设你已经死了,并且会重新平衡。每次民意调查都会返回更多数据。 所以这意味着我最终将在这个缓冲区中存储大量数据,这不太理想。
理想的做法是告诉 kafka “我还活着”,而不是实际要求更多信息。这样我可以循环直到缓冲区为空,然后才请求更多消息。
这是我的代码的样子:
def run(): Unit = {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer = new KafkaConsumer[Long, Array[Byte]](properties, new LongDeserializer().asInstanceOf[Deserializer[Long]], new ByteArrayDeserializer)
consumer.subscribe(List(topic).asJava)
while (true) {
val pollResult = consumer.poll(Duration.ofSeconds(1))
val commitMap = for (partition <- pollResult.partitions().asScala) yield {
val records = pollResult.records(partition).asScala
val record = records.find(record => {
if (record.timestamp() + secondsInMonth * 1000 < Instant.now.toEpochMilli) {
DoAction(record)
false
}
else {
true
}
})
val offset = new OffsetAndMetadata(record.map(_.offset()).getOrElse(records.last.offset() + 1))
(partition, offset)
}
consumer.commitSync(commitMap.toMap.asJava, Duration.ofSeconds(1))
}
}
【问题讨论】:
-
默认情况下,您的消息会在 7 天后消失...您可以在不提交的情况下进行轮询,这只会使其重新轮询现有数据。您还可以在消费者上调用
pause以完全阻止它进行轮询。投票时间超过 1 秒也可能有意义 -
我们在这里使用更长的保留时间:-)。如果您在未提交的情况下进行轮询,则在上次轮询数据之后继续。
-
好吧,你不必立即在循环内轮询,你可以
continue没有轮询的循环,或者你可以使用ExecutorService来安排一个实际的轮询事件。或者改变循环条件在某个时间窗口内被调用。 -
但我必须在某个时间窗口内进行轮询,否则经纪人会重新平衡。这意味着我最终会在内存中存储大量消息。
-
就像我说的,你可以暂停一个消费者,然后你就不必在 apk 消费者线程暂停后轮询和重新平衡
标签: apache-kafka