【问题标题】:Kafka Streams Reduce vs SuppressKafka Streams 减少与抑制
【发布时间】:2020-11-18 16:16:07
【问题描述】:

在阅读 suppress() 文档时,我发现除非将记录发布到主题,否则时间窗口不会提前,因为它是基于事件时间的。现在,我的代码正在输出每个键的最终值,因为主题的流量是恒定的,但是当系统关闭时会有停机时间,导致状态存储中的现有记录被“冻结”。我想知道只有 reduce() 而不是 reduce().suppress() 之间有什么区别。 reduce() 是否像suppress() 一样,因为它们都是事件时间驱动的?我的理解是两者都在做同样的事情,在某个时间窗口内聚合键。

我的拓扑如下:

    final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
    final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
    keySpecificAvroSerde.configure(serdeConfig, true);
    final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
    valueSpecificAvroSerde.configure(serdeConfig, false);

    // KStream<EligibilityKey, Eligibility>
    KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
            Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));

    // KStream<EligibilityKey, String>
    KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
            .mapValues((key, value) -> Processor.process(key, value));

    // WindowBytesStoreSupplier
    WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
            Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);

    // Materialized
    Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
            .as(windowBytesStoreSupplier);
    materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());

    // TimeWindows
    TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
            .grace(Duration.ofSeconds(afterWindowEnd));

    // KTable<Windowed<EligibilityKey>, String>
    KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
            .groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
            .reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));

    // KStream<Windowed<EligibilityKey>, String>
    KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    通过不加抑制地使用reduce(),聚合的结果会不断更新,即保存reduce()结果的KTable的更新也会在处理窗口的所有记录之前发送到下游。

    假设一个 reduce 仅将持续时间为 3 且宽限为 0 的窗口中的值和以下输入记录(键、值、时间戳)相加到 reduce()

    • W1 的输入记录 (A, 1, 1) -> 输出记录 ((W1,A), 1) 发送到下游
    • W1 的输入记录 (A, 2, 2) -> 输出记录 ((W1,A), 3) 发送到下游
    • W1 的输入记录 (A, 3, 3) -> 输出记录 ((W1,A), 6) 发送到下游
    • W2 的输入记录 (A, 4, 4) -> 输出记录 ((W2,A), 4) 发送到下游

    使用reduce().suppress(),结果会被缓冲直到窗口关闭。结果是:

    • W1 的输入记录 (A, 1, 1) -> 无输出
    • W1 的输入记录 (A, 2, 2) -> 无输出
    • W1 的输入记录 (A, 3, 3) -> 无输出
    • W2 的输入记录 (A, 4, 4) -> 输出记录 ((W1,A), 6) 发送到下游

    请注意,对于没有 suppress() 的情况,我假设缓存是用 cache.max.bytes.buffering = 0 关闭的。使用cache.max.bytes.buffering &gt; 0(默认为10MB),缓存会缓存一个KTable的输出记录,一旦缓存满了,就会输出key最近最少更新的记录。

    【讨论】:

      猜你喜欢
      • 2019-10-23
      • 2014-04-29
      • 1970-01-01
      • 1970-01-01
      • 2020-01-08
      • 2021-01-25
      • 2020-03-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多