【发布时间】:2020-07-15 17:03:10
【问题描述】:
我需要创建具有每天经过身份验证的用户数量的状态存储,以便我可以获得过去一天、过去 7 天和过去 30 天内经过身份验证的用户的数量。 为了实现这一点,每个身份验证事件都被发送到 auth-event 主题。 我每天都在流式传输这个主题并创建窗口。 代码:
KStream<String, GenericRecord> authStream = builder.stream("auth-event", Consumed.with(stringSerde, valueSerde)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
.withTimestampExtractor(new TransactionTimestampExtractor()));
authStream
.groupBy(( String key, GenericRecord value) -> value.get("tenantId").toString(), Grouped.with(Serdes.String(), valueSerde))
.windowedBy(TimeWindows.of(Duration.ofDays(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("auth-result-store")
.withKeySerde(stringSerde)
.withValueSerde(longSerde))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to("auth-result-topic", Produced.with(timeWindowedSerdeFrom(String.class), Serdes.Long()));
之后,我将插入有关该主题的记录。 我也有休息控制器,我正在使用 ReadOnlyWindowStore 读取商店。 day 参数从 UI 发送,可以是 1、7 或 30 天。这意味着我想阅读最后 7 个窗口。 代码:
final ReadOnlyWindowStore<String, Long> dayStore = kafkaStreams.store(KStreamsLdapsExample.authResultTable, QueryableStoreTypes.windowStore());
Instant timeFrom = (Instant.now().minus(Duration.ofDays(days)));
LocalDate currentDate = LocalDate.now();
LocalDateTime currentDayTime = currentDate.atTime(23, 59, 59);
Instant timeTo = Instant.ofEpochSecond(currentDayTime.toEpochSecond(ZoneOffset.UTC));
try(WindowStoreIterator<Long> it1 = dayStore.fetch(tenant, timeFrom, timeTo)) {
Long count = 0L;
JsonObject jsonObject = new JsonObject();
while (it1.hasNext())
{
final KeyValue<Long, Long> next = it1.next();
Date resultDate = new Date(next.key);
jsonObject.addProperty(resultDate.toString(), next.value);
count += next.value;
}
jsonObject.addProperty("tenant", tenant);
jsonObject.addProperty("Total number of events", count);
return ResponseEntity.ok(jsonObject.toString());
}
问题是,我只能在 1-2 天内得到结果。之后,旧的窗户就会丢失。 另一个问题是输出主题中写的信息:“auth-result-topic” 我正在使用控制台消费者阅读结果,并且有很多空记录,没有键,没有值,还有一些带有一些随机数的记录。 enter image description here
知道我的商店发生了什么吗?如何阅读过去的 N 个窗口? 谢谢
【问题讨论】:
标签: apache-kafka aggregate apache-kafka-streams