【问题标题】:How to delete data which already been consumed by consumer? Kafka如何删除消费者已经消费的数据?卡夫卡
【发布时间】:2019-03-28 10:35:38
【问题描述】:

我在 kafka 中进行数据复制。但是,kafka 日志文件的大小增加得非常快。一天的大小达到 5 GB。作为这个问题的解决方案,我想立即删除处理过的数据。我在 AdminClient 中使用删除记录方法来删除偏移量。但是当我查看日志文件时,该偏移量对应的数据并没有被删除。

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

我不想要像 (log.retention.hours , log.retention.bytes , log.segment.bytes , log.cleanup.policy=delete)

这样的建议

因为我只想删除消费者消费的数据。在这个解决方案中,我还删除了没有被消费的数据。

你有什么建议?

【问题讨论】:

  • @Gio 大多数答案都是在存在 AdminClient 来删除记录之前...
  • 如果其他消费者想要这些数据怎么办?删除偏移量不应该由客户端决定,这是服务器端配置
  • 每个消费者都有一个单独的主题。这就是为什么客户端没有问题的原因。我也可以删除偏移量。我的问题是,当我删除偏移量时,日志文件的大小并没有减少。因为我的日志文件没有缩小,磁盘很快就会被填满。

标签: java apache-kafka offset kafka-topic


【解决方案1】:

你没有做错任何事。您提供的代码有效,我已经对其进行了测试。以防我忽略了您的代码中的某些内容,我的是:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

我使用过组:'org.apache.kafka',名称:'kafka-clients',版本:'2.0.0'

因此请检查您是否针对正确的分区(第一个分区为 0)

检查您的代理版本:https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html 说:

0.11.0.0 版本的代理支持此操作

从同一个应用程序生成消息,以确保您已正确连接。

您还可以考虑另一种选择。使用 cleanup.policy=compact 如果您的消息密钥重复,您可以从中受益。不仅因为该键的旧消息将被自动删除,而且您可以使用具有空负载的消息删除该键的所有消息这一事实。只是不要忘记将 delete.retention.msmin.compaction.lag.ms 设置为足够小的值。在这种情况下,您可以使用一条消息,而不是为同一密钥生成空有效负载(但请谨慎使用这种方法,因为这样您可以删除未使用的消息(使用该密钥))

【讨论】:

    【解决方案2】:

    试试这个

    DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
    Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
    try {
        for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
            System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    adminClient.close();
    

    在这段代码中,需要调用entry.getValue().get().lowWatermark(),因为adminClient.deleteRecords(recordsToDelete)返回的是Futures的map,需要调用get()等待Future运行

    【讨论】:

      猜你喜欢
      • 2022-01-01
      • 1970-01-01
      • 2019-07-03
      • 2018-05-05
      • 2021-08-22
      • 1970-01-01
      • 1970-01-01
      • 2018-03-08
      • 2020-10-28
      相关资源
      最近更新 更多