【问题标题】:Kafka Stream windowedBy aggregate Materialized withRetention Out Of MemoeryKafka Streams windowedBy 聚合物化并保留内存不足
【发布时间】:2021-03-13 21:13:04
【问题描述】:

我有一个 KStream 应该是 windowedBy 并聚合导致内存不足:

java.lang.OutOfMemoryError: Java heap space

KStream DSL如下:

TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
        Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
            aggregate.key = value.uuid;
            aggregate.addHistoryEventWindow(value);
            return aggregate;
        };

KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
    .groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
    // segment our messages into 1-day windows
    .windowedBy(timeWindows)
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
    .groupBy(
            (key, value) -> new KeyValue<String, History>(
                    value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
            Grouped.with(Serdes.String(), this.historySerde))
    .aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
            Materialized.with(Serdes.String(), this.historySerde));

阅读了一些文章(例如Kafka Streams Window By & RocksDB Tuning),我注意到我可能必须将商店“Materialized”配置为保留“1 天 + 1 Milli”。

但尝试添加对我不起作用:

final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
        .withKeySerde(Serdes.String())
        .withValueSerde(this.historySerde)
        .withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));

KTable<String, History> historyWindowed = eventStreamRaw
    ...
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), store)

Java 编译抛出以下错误:

The method 
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>) 
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments 
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)

说实话,我不明白。参数正确; VR 类型是“历史”。

那么,你知道我错过了什么吗?

这个windowedBy KTable 的想法是有一个状态,它可以将一个“事物”的所有事件保存一天。假设产生了一个新警报,我想将某一天的“事物”的所有事件附加到警报中。然后我会从 KStream Alert 到 KTable History 进行 leftJoin。这是将历史数据添加到 Kafka 事件的最佳方式吗?有没有办法“查找”KStream 事件的最后 x 天?我已经检查了 KStream Alert-KStream 事件 leftJoin 但这会为每个新的 KStream 事件产生一个输出。所以,从我的观点来看,这是不切实际的。

非常感谢您的帮助。我希望这只是一个简单的修复。非常感谢!

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    查看以下帖子Kafka Streams App - count and sum aggregate 我导入了错误的“字节”类。所以,一定要导入以下类“org.apache.kafka.common.utils.Bytes”。

    但是,也许您有一个更好的主意,将来自一个流的 Kafka 消息与来自与(外)键相关的另一个流的历史数据进行丰富。

    谢谢大家。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多