【问题标题】:In Kafka how to get the exact offset according producing time在Kafka中如何根据生产时间获得确切的偏移量
【发布时间】:2014-03-21 12:34:27
【问题描述】:

我需要在一天中逐小时获取 Kafka 中生成的消息。每隔一小时,我将启动一个作业来使用一小时前产生的消息。例如,如果当前时间是 20:12,我将在 19:00:00 到 19:59:59 之间消费消息。这意味着我需要在时间 19:00:00 之前获得开始偏移量,并在时间 19:59:59 之前获得结束偏移量。我使用了 SimpleConsumer.getOffsetsBefore,如「0.8.0 SimpleConsumer Example」所示。问题是返回的偏移量与作为参数给出的时间戳不匹配。例如当时间戳为 19:00:00 时,我得到了在 16:38:00 生成的消息。

【问题讨论】:

    标签: apache-kafka timestamp offset


    【解决方案1】:

    以下kafka消费者api方法getOffsetsByTimes()可用于此,从0.10.0或更高版本可用。见JavaDoc

    /**
     * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
     * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
     *
     * This is a blocking call. The consumer does not have to be assigned the partitions.
     * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
     * will be returned for that partition.
     *
     * Notice that this method may block indefinitely if the partition does not exist.
     *
     * @param timestampsToSearch the mapping from partition to the timestamp to look up.
     * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
     *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
     *         such message.
     * @throws IllegalArgumentException if the target timestamp is negative.
     */
    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
            // OffsetAndTimestamp is always positive.
            if (entry.getValue() < 0)
                throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                        entry.getValue() + ". The target time cannot be negative.");
        }
        return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
    }
    

    【讨论】:

      【解决方案2】:

      在 Kafka 中,目前无法获得与特定时间戳相对应的偏移量 - 这是设计使然。如Jay Kreps's Log Article 顶部附近所述,偏移量为与挂钟时间分离的日志提供了一种时间戳。将偏移量作为您的时间概念,那么您可以知道任何两个系统是否处于一致状态,只需购买知道它们已读取的偏移量即可。对于不同服务器上的不同时钟时间、闰年、夏令时、时区等,从来没有任何混淆。这有点好...

      现在...说了这么多,如果您知道您的服务器在某个时间 X 出现故障,那么实际上,您真的很想知道相应的偏移量。你可以靠近。 kafka 机器上的日志文件是根据它们开始写入的时间命名的,并且有一个 kafka 工具(我现在找不到)可以让您知道哪些偏移量与这​​些文件相关联。但是,如果您想知道确切的时间戳,则必须在发送到 Kafka 的消息中对时间戳进行编码。

      【讨论】:

      • 是的,我使用“编码您发送给 Kafka 的消息中的时间戳”解决了这个问题,它已经工作了几个月。
      • @PoZhou 我也有同样的情况,你能给我举个简单的例子吗?谢谢
      • 请注意,Kafka 很快就会支持此功能:cwiki.apache.org/confluence/display/KAFKA/…
      【解决方案3】:

      正如其他回复所指出的,旧版本的 Kafka 只有一种将时间映射到偏移量的近似方法。但是,从 Kafka 0.10.0(2016 年 5 月发布)开始,Kafka 为每个主题维护了一个时间索引。这将使您能够有效地从时间到精确的偏移量。您可以使用KafkaConsumer#offsetsForTimes method 访问此信息。

      KIP-33 design discussion 页面上有更多关于如何实现基于时间的索引的详细信息。

      【讨论】:

        【解决方案4】:

        给你看代码:

        public static Map<TopicPartition, OffsetAndTimestamp> getOffsetAndTimestampAtTime(String kafkaServer, String topic, long time) {
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
            kafkaParams.put(GROUP_ID_CONFIG, "consumerGroupId");
            kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaParams.put(AUTO_OFFSET_RESET_CONFIG, "latest");
            kafkaParams.put(ENABLE_AUTO_COMMIT_CONFIG, false);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaParams);
        
            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        
            List<TopicPartition> topicPartitions = partitionInfos
                    .stream()
                    .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
                    .collect(Collectors.toList());
        
            Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream()
                    .collect(Collectors.toMap(tp -> tp, tp -> time));
        
            Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(topicPartitionToTimestampMap);
            consumer.close();
            return result;
        }
        

        【讨论】:

          【解决方案5】:

          Kafka 1.10 确实支持时间戳,尽管使用它来做你想做的事情仍然是一个小挑战。但是,如果您知道要从哪个时间戳开始读取,并且直到您想要读取,那么您可以轮询消息直到那个时间,然后停止消费。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2011-08-04
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多