【问题标题】:Spring Kafka - Consume last N messages for partitions(s) for any topicSpring Kafka - 为任何主题的分区消耗最后 N 条消息
【发布时间】:2020-02-08 21:16:07
【问题描述】:

我正在尝试读取请求的 kafka 消息数。 对于非事务性消息,我们将从 endoffset 中寻找 - N for M 个分区开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息。对于幂等/事务性消息,我们必须考虑事务标记/重复消息,并且意味着偏移量将不连续,在这种情况下 endoffset - N 不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息达到每个分区或开始偏移量

由于有多个分区,我需要跟踪读取的所有偏移量,以便在完成所有操作后停止。有两个步骤,第一步是计算起始偏移量(结束偏移量 - 请求的消息数)和结束偏移量。 (偏移量不连续存在间隙)我会寻找分区以从开始偏移量开始消费。第二步是轮询消息并计算每个分区中的消息数,如果我们不满足请求的消息数,则再次重复第一步和第二步,直到我们满足每个分区的消息数。

条件

初始轮询可能不会返回任何记录,因此请继续轮询。 当您达到每个分区的结束偏移量时停止轮询或轮询不返回任何结果。 检查每个分区中读取的消息是否与请求的消息相同。如果是标记为完成,如果没有标记为继续并重复步骤。考虑消息中的空白。 应该适用于事务性和非事务性生产者。

问题:

我将如何跟踪已为每个分区读取的所有消息并跳出循环?如果有帮助,每个分区中的消息将按顺序排列。

spring kafka 支持这样的用例吗?更多详情可以找到here

更新:我要求读取每个分区中的最后 N 条消息。分区和没有消息是用户输入。我想将所有偏移量管理保留在内存中。本质上,我们正在尝试按 LIFO 顺序读取消息。这使得它变得棘手,因为 Kafka 允许您向前而不是向后阅读。

【问题讨论】:

  • 您的要求不是很清楚。如果您在自己的组中创建单个消费者,则分区不适用......或者它与您寻求的信息相关?否则我只会轮询并填写一个列表,直到发生超时并且列表包含请求的元素数量。
  • 抱歉,添加了更新。如果您仍有疑问,请告诉我。
  • 您只阅读未读邮件吗?
  • 你说“用户输入”所以这不涉及轮询?
  • @kentor 不支持开箱即用。我最终创建了一个迭代器来包装轮询结果,跟踪偏移量/分区并聚合结果并查找下一条先前的消息,直到满足请求的计数。

标签: java spring apache-kafka apache-kafka-streams spring-kafka


【解决方案1】:

为什么会有这样的需求,我不明白。当队列中没有任何内容时,Kafka 自己进行管理。如果消息从一个状态跳到另一个状态,则可以有单独的队列/主题。但是,这里是如何做到这一点的。

当我们使用类似 -

的方式使用分区中的消息时
ConsumerIterator<byte[], byte[]> it = something; //initialize consumer
while (it.hasNext()) {
  MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
  String kafkaMessage = new String(messageAndMetadata.message());
  int partition = messageAndMetadata.partition();
  long offset = messageAndMetadata.offset();
  boolean processed = false;
  do{
    long maxOffset = something; //fetch from db
    //if offset<maxOffset, then process messages and manual commit
    //else busy wait or something more useful
  }while(processed);
}

我们获得有关偏移量、分区号和消息本身的信息。您可以选择使用此信息执行任何操作。

对于您的用例,您可能还决定将使用的偏移量保存到数据库中,以便下次可以调整偏移量。另外,我建议关闭连接以进行清理,并最终将处理后的偏移量保存到 DB。

【讨论】:

    【解决方案2】:

    所以如果我理解正确的话,这应该可以使用标准的 Kafka Consumer

    Consumer<?, Message> consumer = ...
    
    public Map<Integer, List<Message>> readLatestFromPartitions(String topic, Collection<Integer> partitions, int count) {
    
        // create the TopicPartitions we want to read
        List<TopicPartition> tps = partitions.stream().map(p -> new TopicPartition(topic, p)).collect(toList());
        consumer.assign(tps);
    
        // create and initialize the result map
        Map<Integer, List<Message>> result = new HashMap<>();
        for (Integer i : partitions) { result.add(new ArrayList<>()); }
    
        // read until the expected count has been read for all partitions
        while (result.valueSet().stream().findAny(l -> l.size() < count)) {
            // read until the end of the topic
            ConsumerRecords<?, Message> records = consumer.poll(Duration.ofSeconds(5));
            while (records.count() > 0) {
                Iterator<ConsumerRecord<?, Message>> recordIterator = records.iterator();
                while (recordIterator.hasNext()) {
                    ConsumerRecord<?, Message> record = recordIterator.next();
                    List<Message> addTo = result.get(record.partition);
                    // only allow 10 entries per partition
                    if (addTo.size() >= count) {
                        addTo.remove(0);
                    }
                    addTo.add(record.value);
                }
                records = consumer.poll(Duration.ofSeconds(5));
            }
            // now we have read the whole topic for the given partitions.
            // if all lists contain the expected count, the loop will finish;
            // otherwise it will wait for more data to arrive.
        }
    
        // the map now contains the messages in the order they were sent,
        // we want them reversed (LIFO)
        Map<Integer, List<Message>> returnValue = new HashMap<>();
        result.forEach((k, v) -> returnValue.put(k, Collections.reverse(v)));
        return returnValue;
    }
    

    【讨论】:

    • 谢谢@daniu。我正在尝试从那一刻起读取有关该主题的现有消息。我们的用例是用户在 M 部分为 X 主题请求 N 条消息,然后我应该能够从每个 M 主题分区的末尾偏移量返回 N 条消息.
    • 对于非事务性消息,我们将从 endoffset - N for M 分区开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息。对于幂等/事务性消息,我们必须考虑事务标记/重复消息,并且意味着偏移量将不连续,在这种情况下 endoffset - N 不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息达到每个分区或开始偏移量。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-04-18
    • 2017-04-29
    • 1970-01-01
    • 2018-04-20
    • 2022-01-02
    • 1970-01-01
    • 2020-11-21
    相关资源
    最近更新 更多