【问题标题】:Kafka producer: send avro as array[byte] without schemaKafka 生产者:将 avro 作为不带模式的数组 [byte] 发送
【发布时间】:2021-01-13 06:34:54
【问题描述】:

我正在尝试在本地设置一个简单的 kafka 堆栈,现在我需要创建一个玩具 Producer。这:https://lombardo-chcg.github.io/tools/2017/09/29/kafka-avro-producer-in-scala.html(我感兴趣的代码见下文)几乎正是我想要的,除了:

这里生产者发送一个 GenericData.Record 对象,所以整个模式都被发送,它不利用模式注册表。我想发送一个 Array[Byte],前几个字节是模式的 id,后面的字节是数据,没有模式(或者我认为这是最好的方法)

我说的那段代码:

import java.util.Properties

import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.slf4j.LoggerFactory

case class User(name: String, favoriteNumber: Int, favoriteColor: String)

class AvroProducer {
  val logger = LoggerFactory.getLogger(getClass)

  val kafkaBootstrapServer = sys.env("KAFKA_BOOTSTRAP_SERVER")
  val schemaRegistryUrl = sys.env("SCHEMA_REGISTRY_URL")

  val props = new Properties()
  props.put("bootstrap.servers", kafkaBootstrapServer)
  props.put("schema.registry.url", schemaRegistryUrl)
  props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("acks", "1")

  val producer = new KafkaProducer[String, GenericData.Record](props)
  val schemaParser = new Parser

  val key = "key1"
  val valueSchemaJson =
  s"""
    {
      "namespace": "com.avro.junkie",
      "type": "record",
      "name": "User2",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": "int"},
        {"name": "favoriteColor", "type": "string"}
      ]
    }
  """
  val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
  val avroRecord = new GenericData.Record(valueSchemaAvro)

  val mary = new User("Mary", 840, "Green")
  avroRecord.put("name", mary.name)
  avroRecord.put("favoriteNumber", mary.favoriteNumber)
  avroRecord.put("favoriteColor", mary.favoriteColor)

  def start = {
    try {
      val record = new ProducerRecord("users", key, avroRecord)
      val ack = producer.send(record).get()
      // grabbing the ack and logging for visibility
      logger.info(s"${ack.toString} written to partition ${ack.partition.toString}")
    }
    catch {
      case e: Throwable => logger.error(e.getMessage, e)
    }
  }
}

问题:

  • 我不知道如何从架构注册表中检索架构的 ID
  • 我不知道如何只发送没有架构的数据 + id 作为 Array[Byte]

我知道如何将整个 avro 写入 Array[Byte]:

    val writer = new SpecificDatumWriter[GenericData.Record](valueSchemaAvro)
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out, null)
    writer.write(avroRecord, encoder) // but here I am also writing the schema, right?
    encoder.flush
    out.close
    out.toByteArray

非常感谢

【问题讨论】:

    标签: scala apache-kafka avro


    【解决方案1】:

    第一个代码确实使用架构注册表,并计算一个 ID + 替换 KafkaAvroSerializer 内的字节数组中的架构

    如果要绕过 Schema Registry,请使用 ByteArraySerializer 并将第二个代码块中 out.toByteArray 的结果发送给生产者。

    【讨论】:

    • 谢谢。只是为了让我清楚,如果我在生产者级别绕过模式注册表,我什至不需要在属性中指定模式注册表 url?但在这种情况下,我真的在每条消息中发送模式。所以更聪明的方法是使用 kafkaAvroSerializer。
    • 你可以保留props.put("schema.registry.url,但是如果你使用内置的Kafka序列化器,它就不会被使用。注册表提供的主要好处是能够针对您的架构运行向后兼容性检查,但如果您有一个大型架构和许多要发送的消息,那么减少网络使用量也很好
    猜你喜欢
    • 2020-11-09
    • 2018-04-19
    • 1970-01-01
    • 2022-01-15
    • 2016-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-03
    相关资源
    最近更新 更多