【问题标题】:Kafka producer Interceptor卡夫卡生产者拦截器
【发布时间】:2016-09-09 19:10:46
【问题描述】:

我正在尝试添加一个拦截器来验证生产者发布到 Kafka 主题的消息。除了 Kafka 主题执行的模式验证之外,我还需要做一些验证。我遵循的步骤如下。

  1. 我编写了一个扩展 ProducerInterceptor 接口的 Java 类。
  2. 编译了类并创建了一个 jar 文件,该文件放置在类路径中包含的文件夹中。
  3. 在 Kafka 安装内的 producer.properties 中添加了 intercetors.classes= classname。

但是当我向主题发布消息时,我编写的自定义拦截器类不会被调用。 (我也没有收到任何错误。消息完美地发布到主题)。

我已经推荐了https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

请对此提出建议。

【问题讨论】:

    标签: java apache-kafka interceptor confluent-platform bigdata


    【解决方案1】:

    这个问题已经很老了,所以我假设您在此期间找到了解决方案。但是,以防万一它对其他人有所帮助,我发现我的 ProducerInterceptor 类,它根据消息的内容将消息分派到不同的主题,除非我的流已经具有指定的输出,否则不会被调用。

    我的第一次尝试看起来像这样,因为我认为我不需要指定输出主题。这不起作用:

    val builder: KStreamBuilder = new KStreamBuilder
    val input = builder.stream("input-topic")
    
    val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
    stream.start()
    

    但这确实:

    val builder: KStreamBuilder = new KStreamBuilder
    val input = builder.stream("input-topic").through("dummy-output-topic")
    
    val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
    stream.start()
    

    值得注意的是,在第二个示例中,没有任何内容发布到 dummy-output-topic,而且使用 to 而不是 through 似乎也以相同的方式工作。

    在我的例子中,我调用map 来更改记录,然后使用拦截器将它们分派到不同的主题,所以我的代码实际上看起来更像这样:

    val builder: KStreamBuilder = new KStreamBuilder
    val input = builder.stream("input-topic")
        .map(new CustomKeyValueMapper)
        .through("dummy-output-topic")
    
    val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
    stream.start()
    

    我希望这些示例可以帮助任何与 ProducerInterceptors 一起工作的人,他们犯了同样的错误。

    【讨论】:

      【解决方案2】:

      属性名是interceptor.classes,不是intercetors.classes

      【讨论】:

      • 感谢克里斯的回复。是的,它在 producer.properties 中正确提及为拦截器.类。抱歉打错了。
      猜你喜欢
      • 2020-08-27
      • 2017-02-21
      • 2019-04-09
      • 2021-01-20
      • 2019-09-24
      • 1970-01-01
      • 2020-09-30
      • 2017-11-23
      • 1970-01-01
      相关资源
      最近更新 更多