【问题标题】:What is the use of Header in Kafka Processor API?Kafka 处理器 API 中的 Header 有什么用?
【发布时间】:2020-07-07 10:32:48
【问题描述】:

我正在学习 Kafka 处理器 API,并在 ProcessorContext 中找到一个方法标头。

headers​()

返回当前输入记录的标题;可能 如果不可用,则为 null

这个方法有什么用?

docs 中只写了一行:

返回当前输入记录的标题;如果它可能为空 不可用

我可以对此执行一些操作吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams kafka-producer-api


    【解决方案1】:

    标头是可以附加到每条消息的某种元数据。标头可用于各种场景,例如在过滤记录时附加可使用的信息等。


    您可以通过处理器 API 访问消息的元数据,更准确地说是 process()transform()transformValues()。对于example,为了给记录添加标题,下面的方法可以解决问题:

    public void process(String key, String value) {
    
        // add a header to the elements
        context().headers().add.("key", "value")
    }
    

    【讨论】:

    • 我可以在 ProducerRecord 中添加相同的内容吗?意味着在创建记录时我可以在此处添加此标头并在处理方法中获取标头
    • 得到了解决方案。 KafkaProducer kafkaProducer = new KafkaProducer(props); ProducerRecord 记录 = new ProducerRecord(topicName, "Hello"); record.headers().add("OperationType", "Create".getBytes()); kafkaProducer.send(record);
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多