【发布时间】:2017-06-22 05:17:01
【问题描述】:
我正在尝试使用火花流以程序的形式实现以下 kafka-console-consumer 命令(运行良好并输出预期的 json 数据)功能。
kafka-console-consumer.sh --zookeeper host.xxxx.com:2181,host.xxxx.com:2181 --topic mytopic --formatter CustomAvroMessageFormatter --property "formatter-schema-file= schema.txt" > /var/tmp/myfile.json&
我能够以编程方式使用火花流从上述主题中读取消息,如下所示的 scala 代码运行良好:
object ConsumeTest {
def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "ConsumeKafkaMsg")
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(1))
//To read from server
val kafkaParams = Map("metadata.broker.list" -> "brokername:9092")
val topics = List("mytopic").toSet
val lines = KafkaUtils.createDirectStream[
String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
然而,上面的程序以类似于下面的二进制格式读取消息:
��Cߣ�ߕ'윺~�_,��M˶/��Ѯ!????�Vcusomtername client
2X3XXXXXX-sasadsad-4673-212c-dsdsadsad
value
,"question"logName
successstԇ���V
针对上述命令,使用自定义 avro 格式化程序使用 avro 模式将二进制格式转换为 json 格式。我无法在上面的程序中找到如何使用命令等效的 avro 消息格式化程序,这对实现很重要。
以下是可能的 avro 架构(schema.txt)供参考(实际上是复杂的可用于处理的内容):
{
"type" : "record",
"namespace" : "mynamespace",
"name" : "myname",
"fields" : [{
"name":"field1",
"type":{
"type":"record",
"name":"Eventfield1",
"fields":[{.....}]
}]
]
}
请帮助实施。
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming avro