【发布时间】:2020-02-08 21:16:07
【问题描述】:
我正在尝试读取请求的 kafka 消息数。 对于非事务性消息,我们将从 endoffset 中寻找 - N for M 个分区开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息。对于幂等/事务性消息,我们必须考虑事务标记/重复消息,并且意味着偏移量将不连续,在这种情况下 endoffset - N 不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息达到每个分区或开始偏移量
由于有多个分区,我需要跟踪读取的所有偏移量,以便在完成所有操作后停止。有两个步骤,第一步是计算起始偏移量(结束偏移量 - 请求的消息数)和结束偏移量。 (偏移量不连续存在间隙)我会寻找分区以从开始偏移量开始消费。第二步是轮询消息并计算每个分区中的消息数,如果我们不满足请求的消息数,则再次重复第一步和第二步,直到我们满足每个分区的消息数。
条件
初始轮询可能不会返回任何记录,因此请继续轮询。 当您达到每个分区的结束偏移量时停止轮询或轮询不返回任何结果。 检查每个分区中读取的消息是否与请求的消息相同。如果是标记为完成,如果没有标记为继续并重复步骤。考虑消息中的空白。 应该适用于事务性和非事务性生产者。
问题:
我将如何跟踪已为每个分区读取的所有消息并跳出循环?如果有帮助,每个分区中的消息将按顺序排列。
spring kafka 支持这样的用例吗?更多详情可以找到here
更新:我要求读取每个分区中的最后 N 条消息。分区和没有消息是用户输入。我想将所有偏移量管理保留在内存中。本质上,我们正在尝试按 LIFO 顺序读取消息。这使得它变得棘手,因为 Kafka 允许您向前而不是向后阅读。
【问题讨论】:
-
您的要求不是很清楚。如果您在自己的组中创建单个消费者,则分区不适用......或者它与您寻求的信息相关?否则我只会轮询并填写一个列表,直到发生超时并且列表包含请求的元素数量。
-
抱歉,添加了更新。如果您仍有疑问,请告诉我。
-
您只阅读未读邮件吗?
-
你说“用户输入”所以这不涉及轮询?
-
@kentor 不支持开箱即用。我最终创建了一个迭代器来包装轮询结果,跟踪偏移量/分区并聚合结果并查找下一条先前的消息,直到满足请求的计数。
标签: java spring apache-kafka apache-kafka-streams spring-kafka