【问题标题】:Data not sent to Kafka with Avro serialization数据未通过 Avro 序列化发送到 Kafka
【发布时间】:2020-11-08 20:35:12
【问题描述】:

我正在尝试使用 Avro 将数据发送到 Kafka,但“消息”仍然为空。我尝试手动将模式添加到 Kafka,但没有结果。我在应用程序的日志、模式注册表日志或 Kafka 的日志中看不到任何错误。你能告诉我我做错了什么吗?

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
import com.sksamuel.avro4s.{ AvroSchema, Record, RecordFormat, SchemaFor }    

case class User(name: String, age: Int)

val connectConfig: Map[String, Object] =
Map(
  "bootstrap.servers"   -> "localhost:9092",
  "client.id"           -> s"clientID${UUID.randomUUID().toString}",
  "key.serializer"      -> "org.apache.kafka.common.serialization.StringSerializer",
  "value.serializer"    -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
  "schema.registry.url" -> "localhost:8081"
)

implicit val producer = new KafkaProducer[String, Record](connectConfig.asJava)

def sendAvro(msg: DIL): Future[RecordMetadata] = {
    val newUser = User("Joe", 42)
    val schema  = AvroSchema[User]
    implicit val schemaFor = SchemaFor[User]
    val format = RecordFormat[User]

    val record = new ProducerRecord[String, Record]("kafkaLogTopic", format.to(newUser))
    producer.send(record)
}

【问题讨论】:

  • 看起来这段代码实际上从未调用过sendAvro 方法。

标签: scala apache-kafka avro avro4s


【解决方案1】:

Kafka批量发送数据,你的单条记录小于默认批量大小。

使用.send(data).get()producer.flush() 立即等待一条记录发送。

否则,您应该在scala.sys.ShutdownHookThread 上定义一个线程来刷新生产者

【讨论】:

  • 感谢您的回答!但是,不幸的是,它不起作用。
  • 你真的在调用 sendAvro 吗?否则,由于该函数返回一个未来,你真的需要做sendAvro().get(),或者producer.flush
猜你喜欢
  • 2020-05-03
  • 1970-01-01
  • 1970-01-01
  • 2023-03-03
  • 1970-01-01
  • 2016-11-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多