【问题标题】:How to process messages in avro format from Kafka? [duplicate]如何处理来自 Kafka 的 avro 格式的消息? [复制]
【发布时间】:2017-06-22 05:17:01
【问题描述】:

我正在尝试使用火花流以程序的形式实现以下 kafka-console-consumer 命令(运行良好并输出预期的 json 数据)功能。

kafka-console-consumer.sh --zookeeper host.xxxx.com:2181,host.xxxx.com:2181 --topic mytopic --formatter CustomAvroMessageFormatter --property "formatter-schema-file= schema.txt" > /var/tmp/myfile.json&

我能够以编程方式使用火花流从上述主题中读取消息,如下所示的 scala 代码运行良好:

object ConsumeTest {

def main(args: Array[String]) {
  val sc = new SparkContext("local[*]", "ConsumeKafkaMsg")
  sc.setLogLevel("ERROR")
  val ssc = new StreamingContext(sc, Seconds(1))

  //To read from server
  val kafkaParams = Map("metadata.broker.list" -> "brokername:9092")
  val topics = List("mytopic").toSet

  val lines = KafkaUtils.createDirectStream[
   String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

  lines.print()

  ssc.start()
  ssc.awaitTermination()
  }

}

然而,上面的程序以类似于下面的二进制格式读取消息:

��Cߣ�ߕ'윺~�_,��M˶/��Ѯ!????�Vcusomtername client
2X3XXXXXX-sasadsad-4673-212c-dsdsadsad
value
,"question"logName
successstԇ���V

针对上述命令,使用自定义 avro 格式化程序使用 avro 模式将二进制格式转换为 json 格式。我无法在上面的程序中找到如何使用命令等效的 avro 消息格式化程序,这对实现很重要。

以下是可能的 avro 架构(schema.txt)供参考(实际上是复杂的可用于处理的内容):

{
  "type" : "record",
  "namespace" : "mynamespace",
  "name" : "myname",
  "fields" : [{
    "name":"field1",
    "type":{
      "type":"record",
      "name":"Eventfield1",
      "fields":[{.....}]
    }]
  ]
}

请帮助实施。

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming avro


    【解决方案1】:

    你有两个选择(都需要相当密集的编码,这没关系,不是吗?:))。

    1. 编写您自己的自定义 Kafka Deserializer 并在示例中使用 StringDecoder 的地方使用它。

    2. 在您加载数据集(用于批处理)后,使用 foreach 运算符或使用 map 转换将转换作为管道的一部分应用。

    您也可以考虑使用spark-avro 库。

    【讨论】:

      猜你喜欢
      • 2019-07-25
      • 1970-01-01
      • 1970-01-01
      • 2017-02-17
      • 2021-11-30
      • 2019-08-03
      • 2015-11-09
      • 2020-05-15
      • 2018-01-18
      相关资源
      最近更新 更多