【问题标题】:How to use Kafka time window for historical aggregation?如何使用Kafka时间窗口进行历史聚合?
【发布时间】:2020-07-15 17:03:10
【问题描述】:

我需要创建具有每天经过身份验证的用户数量的状态存储,以便我可以获得过去一天、过去 7 天和过去 30 天内经过身份验证的用户的数量。 为了实现这一点,每个身份验证事件都被发送到 auth-event 主题。 我每天都在流式传输这个主题并创建窗口。 代码:

KStream<String, GenericRecord> authStream = builder.stream("auth-event", Consumed.with(stringSerde, valueSerde)
            .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
            .withTimestampExtractor(new TransactionTimestampExtractor()));

        authStream 
                .groupBy(( String key, GenericRecord value) -> value.get("tenantId").toString(), Grouped.with(Serdes.String(), valueSerde))
                .windowedBy(TimeWindows.of(Duration.ofDays(1)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("auth-result-store")
                        .withKeySerde(stringSerde)
                        .withValueSerde(longSerde))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .to("auth-result-topic", Produced.with(timeWindowedSerdeFrom(String.class), Serdes.Long()));

之后,我将插入有关该主题的记录。 我也有休息控制器,我正在使用 ReadOnlyWindowStore 读取商店。 day 参数从 UI 发送,可以是 1、7 或 30 天。这意味着我想阅读最后 7 个窗口。 代码:

final ReadOnlyWindowStore<String, Long> dayStore = kafkaStreams.store(KStreamsLdapsExample.authResultTable, QueryableStoreTypes.windowStore());

        Instant timeFrom = (Instant.now().minus(Duration.ofDays(days)));

        LocalDate currentDate = LocalDate.now();
        LocalDateTime currentDayTime = currentDate.atTime(23, 59, 59);
        Instant timeTo = Instant.ofEpochSecond(currentDayTime.toEpochSecond(ZoneOffset.UTC));

        try(WindowStoreIterator<Long> it1 = dayStore.fetch(tenant, timeFrom, timeTo)) {
            Long count = 0L;
            JsonObject jsonObject = new JsonObject();
            while (it1.hasNext())
            {
                final KeyValue<Long, Long> next = it1.next();
                Date resultDate = new Date(next.key);
                jsonObject.addProperty(resultDate.toString(), next.value);
                count += next.value;
            }

            jsonObject.addProperty("tenant", tenant);
            jsonObject.addProperty("Total number of events", count);

            return ResponseEntity.ok(jsonObject.toString());
        }

问题是,我只能在 1-2 天内得到结果。之后,旧的窗户就会丢失。 另一个问题是输出主题中写的信息:“auth-result-topic” 我正在使用控制台消费者阅读结果,并且有很多空记录,没有键,没有值,还有一些带有一些随机数的记录。 enter image description here

知道我的商店发生了什么吗?如何阅读过去的 N 个窗口? 谢谢

【问题讨论】:

    标签: apache-kafka aggregate apache-kafka-streams


    【解决方案1】:

    您需要通过Materialize.as(...).withRetention(...) 增加存储保留时间(默认为1 天),您可以将其传递给count() 运算符。

    您可能还想通过TimeWindows.of(Duration.ofDays(1)).grace(...) 增加窗口宽限期。

    使用控制台消费者读取数据:您需要指定正确的反序列化器。用于写入输出主题的 window-serde 和 long-serde 使用二进制格式,而控制台使用者默认采用字符串数据类型。您可以指定相应的命令行参数来设置不同的键和值反序列化器,这些反序列化器必须与您写入主题时使用的序列化器相匹配。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-02-14
      • 2019-10-15
      • 1970-01-01
      • 2016-12-20
      • 1970-01-01
      • 1970-01-01
      • 2022-01-12
      相关资源
      最近更新 更多