【问题标题】:How to get current Kafka topic inside Kafka stream?如何在 Kafka 流中获取当前的 Kafka 主题?
【发布时间】:2018-01-19 03:29:28
【问题描述】:

我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door、house.room ) 并使用 Kafka 流正则表达式主题模式 API 使用所有主题。 一切看起来都不错,我得到了数据的密钥和消息。

为了处理数据,我需要主题名称,这样我就可以根据主题名称加入, 但我不知道如何在 Kafka 流 DSL 中获取主题名称。

解决我的问题的一种可能方法是将主题名称与我的消息一起保存。 但是如果我可以直接获取主题名称会更好。

那么,如何在 Kafka 流中获取当前的 Kafka 主题?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    为了补充 Matthias J. Sax 的观点,我附上了示例代码来展示它是如何完成的。

    public static void main(final String[] args) {
        try {
            final Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
            final KStream<String, String> textLines = builder.stream(inputTopiclist);
            textLines.transform(getTopicDetailsTransformer::new)
            .foreach(new ForeachAction<String, String>() {
                public void apply(String key, String value) {
                    System.out.println(key + ": " + value);
                }
            });
            textLines.to(outputTopic);
        } catch (Exception e) {
            System.out.println(e);
        }
    }
     private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {
    
            private ProcessorContext context;
    
            @Override
            public void init(final ProcessorContext context) {
                 this.context = context;
            }
    
            public KeyValue<String, String> transform(final String recordKey, final String recordValue) {
    
              //here i am returning key as topic name.
              return KeyValue.pair(context.topic(), recordValue);
            }
    
            @Override
            public void close() {
              // Not needed.
            }
    
          }
    

    【讨论】:

      【解决方案2】:

      常见问题:https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information

      可通过处理器 API 访问记录元数据。由于其处理器 API 集成,它还可以通过 DSL 间接访问。

      使用处理器 API,您可以通过 ProcessorContext 访问记录元数据。您可以在 Processor#init() 期间将对上下文的引用存储在处理器的实例字段中,然后在 Processor#process() 中查询处理器上下文,例如(对于 Transformer 也是如此)。上下文会自动更新以匹配当前正在处理的记录,这意味着诸如 ProcessorContext#partition() 之类的方法总是返回当前记录的元数据。在调度的 punctuate() 函数中调用处理器上下文时有一些注意事项,有关详细信息,请参阅 Javadocs。

      例如,如果您将 DSL 与自定义 Transformer 结合使用,您可以将输入记录的值转换为还包括分区和偏移元数据,随后 DSL 操作(例如 map 或 filter)可以利用此信息。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-06-20
        • 1970-01-01
        • 2015-07-03
        • 2017-08-08
        • 2017-04-07
        • 1970-01-01
        相关资源
        最近更新 更多