【问题标题】:Kafka KeyValueStore - delete method is not workingKafka KeyValueStore - 删除方法不起作用
【发布时间】:2021-02-09 15:53:49
【问题描述】:

我正在尝试在我的一个 Kafka 处理器中安排一项任务,以从本地 KeyValueStore (RocksDB) 中删除记录。即使到目前为止没有出现异常,也没有任何记录被删除。这是我的代码:

 processorContext.schedule(Duration.ofHours(6), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
  store.all().forEachRemaining(keyValue -> {
    // CommonUtil.removeOutdatedMessage(store,keyValue.key,keyValue.value.getSentAt());
    LocalDateTime sentAt = null;
    try {
      sentAt = LocalDateTime.parse(keyValue.value.getSentAt(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
    } catch (DateTimeException ex) {
      LOGGER.warn("Parsing of date {} failed for message with: {}", keyValue.value.getSentAt(), ex);
    }

    if (sentAt == null) {
      store.delete(keyValue.key);
    } else {
      boolean isExpired = sentAt.isBefore(LocalDateTime.now().minusDays(Constants.MESSAGE_EXPIRATION_LIMIT));
      if (isExpired) {
        store.delete(keyValue.key);
      }
    }
  });
}); 

}

【问题讨论】:

    标签: apache-kafka apache-kafka-streams key-value-store rocksdb rocksdb-java


    【解决方案1】:

    您尚未提供有关您正在解析的日期值的任何详细信息。另外,您是否看到日期解析失败的日志消息?更多细节肯定会有所帮助。

    根据您共享的代码 sn-p,我假设 sentAt variable is being set as nonNull 由于异常或任何原因和 isExpired as false

    因此,您看不到任何删除发生。放置调试点以找出代码中的对象流。

    这是您的 java 代码中的问题,不在 Kafka 状态存储中。

    【讨论】:

    • 这个想法是删除所有有日期的记录,无法解析和所有过期的记录。我放置断点并调用该方法。问题是我没有 RocksDBConfigSetter。 RocksDB 的数据压缩是在后台进行的。
    猜你喜欢
    • 2016-01-10
    • 2014-02-18
    • 2018-12-22
    • 2016-08-30
    • 2013-06-03
    • 2012-10-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多