【问题标题】:How to wait for a finite stream bulk result如何等待有限流批量结果
【发布时间】:2020-03-27 23:09:43
【问题描述】:

我有一个使用 spring cloud 流和 kafka 流构建的流处理应用程序, 该系统从应用程序中获取日志,并将它们与另一个流处理器的观察结果进行比较 并产生一个分数,然后将日志流按分数拆分(高于和低于某个阈值)。

拓扑:

问题:

所以我的问题是如何正确实现“记录最佳观察选择器处理器”, 在处理日志的那一刻,观察的数量是有限的,但可能有很多。

所以我想出了两个解决方案...

  1. 按日志 ID 分组和窗口日志评分观察主题,然后减少以获得最高分数。 (问题:对所有观察进行评分可能需要比窗口更长的时间)

  2. 每次评分后发出评分完成消息,加入 log-relevant-observations,使用 log-scored-observations 全局表和交互式查询检查每个观察 id 是否在全局表存储中,当所有 id在商店地图中以最高得分观察。 (问题:全局表仅用于交互式查询时似乎不起作用)

实现我正在尝试的最佳方法是什么?

  • 我希望不要造成任何分区、磁盘或内存瓶颈。

  • 当从日志和观察中加入值时,所有内容都有唯一的 id 和相关 id 的元组。

(编辑:用图表和更改标题切换拓扑的文本描述)

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    解决方案 #2 似乎有效,但它发出了警告,因为交互式查询需要一些时间才能准备好 - 所以我使用 Transformer 实现了相同的解决方案:

    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    @SuppressWarnings("unchecked")
    public class LogBestObservationsSelectorProcessorConfig {
        private String logScoredObservationsStore = "log-scored-observations-store";
    
        private final Serde<LogEntryRelevantObservationIdTuple> logEntryRelevantObservationIdTupleSerde;
        private final Serde<LogRelevantObservationIdsTuple> logRelevantObservationIdsTupleSerde;
        private final Serde<LogEntryObservationMatchTuple> logEntryObservationMatchTupleSerde;
        private final Serde<LogEntryObservationMatchIdsRelevantObservationsTuple> logEntryObservationMatchIdsRelevantObservationsTupleSerde;
    
        @Bean
        public Function<
                GlobalKTable<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple>,
                    Function<
                        KStream<LogEntryRelevantObservationIdTuple, LogEntryRelevantObservationIdTuple>,
                        Function<
                                KTable<String, LogRelevantObservationIds>,
                                KStream<String, LogEntryObservationMatchTuple>
                        >
                    >
                >
        logBestObservationSelectorProcessor() {
            return (GlobalKTable<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple> logScoredObservationsTable) ->
                    (KStream<LogEntryRelevantObservationIdTuple, LogEntryRelevantObservationIdTuple> logScoredObservationProcessedStream) ->
                            (KTable<String, LogRelevantObservationIdsTuple> logRelevantObservationIdsTable) -> {
                return logScoredObservationProcessedStream
                        .selectKey((k, v) -> k.getLogId())
                        .leftJoin(
                                logRelevantObservationIdsTable,
                                LogEntryObservationMatchIdsRelevantObservationsTuple::new,
                                Joined.with(
                                        Serdes.String(),
                                        logEntryRelevantObservationIdTupleSerde,
                                        logRelevantObservationIdsTupleSerde
                                )
                        )
                        .transform(() -> new LogEntryObservationMatchTransformer(logScoredObservationsStore))
                        .groupByKey(
                                Grouped.with(
                                    Serdes.String(),
                                    logEntryObservationMatchTupleSerde
                                )
                        )
                        .reduce(
                                (match1, match2) -> Double.compare(match1.getScore(), match2.getScore()) != -1 ? match1 : match2,
                                Materialized.with(
                                        Serdes.String(),
                                        logEntryObservationMatchTupleSerde
                                )
                        )
                        .toStream()
                        ;
            };
        }
    
        @RequiredArgsConstructor
        private static class LogEntryObservationMatchTransformer implements Transformer<String, LogEntryObservationMatchIdsRelevantObservationsTuple, KeyValue<String, LogEntryObservationMatchTuple>> {
            private final String stateStoreName;
            private ProcessorContext context;
            private TimestampedKeyValueStore<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple> kvStore;
    
            @Override
            public void init(ProcessorContext context) {
                this.context = context;
                this.kvStore = (TimestampedKeyValueStore<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple>) context.getStateStore(stateStoreName);
            }
    
            @Override
            public KeyValue<String, LogEntryObservationMatchTuple> transform(String logId, LogEntryObservationMatchIdsRelevantObservationsTuple value) {
                val observationIds = value.getLogEntryRelevantObservationsTuple().getRelevantObservations().getObservationIds();
                val allObservationsProcessed = observationIds.stream()
                        .allMatch((observationId) -> {
                            val key = LogEntryRelevantObservationIdTuple.newBuilder()
                                    .setLogId(logId)
                                    .setRelevantObservationId(observationId)
                                    .build();
                            return kvStore.get(key) != null;
                        });
                if (!allObservationsProcessed) {
                    return null;
                }
    
                val observationId = value.getLogEntryRelevantObservationIdTuple().getObservationId();
                val key = LogEntryRelevantObservationIdTuple.newBuilder()
                        .setLogId(logId)
                        .setRelevantObservationId(observationId)
                        .build();
                ValueAndTimestamp<LogEntryObservationMatchTuple> observationMatchValueAndTimestamp = kvStore.get(key);
                return new KeyValue<>(logId, observationMatchValueAndTimestamp.value());
            }
    
            @Override
            public void close() {
    
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-01-05
      • 2019-12-18
      • 1970-01-01
      • 1970-01-01
      • 2016-06-15
      • 2020-03-13
      • 2020-10-31
      相关资源
      最近更新 更多