【发布时间】:2020-11-18 16:16:07
【问题描述】:
在阅读 suppress() 文档时,我发现除非将记录发布到主题,否则时间窗口不会提前,因为它是基于事件时间的。现在,我的代码正在输出每个键的最终值,因为主题的流量是恒定的,但是当系统关闭时会有停机时间,导致状态存储中的现有记录被“冻结”。我想知道只有 reduce() 而不是 reduce().suppress() 之间有什么区别。 reduce() 是否像suppress() 一样,因为它们都是事件时间驱动的?我的理解是两者都在做同样的事情,在某个时间窗口内聚合键。
我的拓扑如下:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
keySpecificAvroSerde.configure(serdeConfig, true);
final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
valueSpecificAvroSerde.configure(serdeConfig, false);
// KStream<EligibilityKey, Eligibility>
KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));
// KStream<EligibilityKey, String>
KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
.mapValues((key, value) -> Processor.process(key, value));
// WindowBytesStoreSupplier
WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);
// Materialized
Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
.as(windowBytesStoreSupplier);
materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());
// TimeWindows
TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
.grace(Duration.ofSeconds(afterWindowEnd));
// KTable<Windowed<EligibilityKey>, String>
KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
.groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
.reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));
// KStream<Windowed<EligibilityKey>, String>
KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();
【问题讨论】: