【问题标题】:How do I declare the topic of a KafkaListener in Scala using a property value如何使用属性值在 Scala 中声明 KafkaListener 的主题
【发布时间】:2020-02-05 21:02:56
【问题描述】:

我有一个创建生产者的简单 Kafka/Scala 项目。现在我正在尝试创建消费者,但是,当我使用以下代码时......

@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]]){

  @Value("${spring.kafka.topic}") val topic : String = null

  def sendMessage(msg: String): Unit = {
    System.out.println(s"Writing the message $msg to the topic ${this.topic}")
    producer.send(topic, msg.getBytes());
  }

  @KafkaListener(id="test", topics="${this.topic}")
  def consume(record: ConsumerRecord[String, String]): Unit = {
    System.out.println(s"Consumed Strinsg Message : ${record.value()}")
  }

}

我收到以下错误...

[ERROR] ...\service\KafkaService.scala:26: error: type mismatch;
[ERROR]  found   : String("${this.topic}")
[ERROR]  required: Array[String]
[ERROR]   @KafkaListener(id="test", topics="${this.topic}")

我错过了什么?

我也尝试了以下...

@Configuration
public class CommonConfiguration{
    ...
    @Value("${spring.kafka.topic}")
    public String topic;
    ...
}
@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]], config: CommonConfiguration){

  def sendMessage(msg: String): Unit = {
    val topics : Array[String] = config.getTopics();
    println(s"Writing the message $msg ${topics.mkString(" ")}")
    producer.send(config.topic, msg.getBytes());
  }

  @KafkaListener(id="test", topics="#{config.topic.split(',')}")
  def consume(record: ConsumerRecord[String, String]): Unit = {
    System.out.println(s"Consumed Strinsg Message : ${record.value()}")
  }

}

仍然没有运气,但生产者的控制台日志正在获得正确的值。

【问题讨论】:

标签: scala apache-kafka


【解决方案1】:

错误表明主题需要 Array[String] 类型,而您提供了 String 类型。您需要将主题字符串转换为数组。

注解看起来与 Spring 非常相似,您可以在其中执行类似的操作

@Value("#{'${kafka.topic}'.split(',')}")

【讨论】:

  • 感谢您提供帮助,但我在标题中指定 Scala 是有原因的。我试过这种方法,但它不起作用。
  • 我的回答的要点是您需要一种将 String 类型转换为 Array[String] 的方法。我还看到的是 kafka 侦听器正在读取值“${this.topic}”而不是主题的实际值。如果是这种情况,那么您会看到错误消息为 String("topicName")而不是 String("${this.topic}").
  • 要点对我没有帮助我已经根据其他问题尝试过。我需要实际值。查看更新。
  • 看起来这是在使用 Scala 不支持的 SpEL。因此,当我确认 Scala 不支持它时,这不是一个好的答案,我将添加链接,但我也需要投反对票。
  • 根据您的原始代码,您可以尝试@KafkaListener(topics = "#{__listener.topic}") 这就是我们在spring kafka(java)中使用的方式。它在 Scala 中可能有效,也可能无效。
【解决方案2】:
@KafkaListener(id="scala", topics=Array("#{'${spring.kafka.topic}'.split(',')}"))

有关更多信息,请参阅 this questionthis question

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多