【问题标题】:Kafka Stream to sort messages based on timestamp key in json messageKafka Stream根据json消息中的时间戳键对消息进行排序
【发布时间】:2018-06-13 08:12:14
【问题描述】:

我正在使用 JSON 消息发布 Kafka,例如:

"UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:40.200Z,"Comments":0,"Like":6
"UserID":222,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":1,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:44.600Z,"Comments":3,"Like":12

我想使用 Kafka Streams 在 10 秒时间窗口内根据 UpdateTime 对消息进行排序,并在另一个 Kafka 主题中推送已排序的消息。 我创建了一个流,它从输入主题中读取数据,然后在 groupByKey() 之后创建 TimeWindowedKStream,其中 UserID 是消息中的键(虽然不需要 groupByKey 然后排序,但我可以不直接获取WindowedBy)。但我无法根据UpdateTime 进一步对 10 秒窗口中的消息进行排序。我的源代码是:

public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-sorting");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("UnsortedMessages");
        TimeWindowedKStream<String, String> countss = source.groupByKey().windowedBy(TimeWindows.of(10000L)
                 .until(10000L));
        /*
        SORTING CODE
            */
        outputMessage.toStream().to("SortedMessages", Produced.with(Serdes.String(), Serdes.Long()));
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);
        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-sorting-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

非常感谢。

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    如果您想忽略键对消息进行排序,则仅基于分区进行此操作才有意义,并且仅当输入主题与输出主题具有相同数量的分区时才有意义。对于这种情况,您应该提取分区号并将其用作消息密钥(参见:https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information

    对于排序,它更加棘手。请注意,Kafka Streams 遵循“连续输出”模型,并使用 DSL 为每个输入记录发出更新。因此,使用处理器 API 可能会更好。您将使用带有附加存储的Processor 并将记录放入存储中。作为内存结构,您保留一个排序的记录列表。随着时间的推移,您可以发出“已完成”的窗口并从存储中删除相应的记录。

    我认为你不能使用 DSL 来构建它。

    【讨论】:

    猜你喜欢
    • 2017-10-11
    • 2020-11-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-11
    • 1970-01-01
    • 2021-08-17
    相关资源
    最近更新 更多