【发布时间】:2017-07-07 03:42:42
【问题描述】:
我想使用我的 spark 流 API 将我的 AVRO kafka 流存储到文件系统,并使用以下分隔格式的 scala 代码,但在实现这一目标时面临一些挑战
record.write.mode(SaveMode.Append).csv("/Users/Documents/kafka-poc/consumer-out/)
由于记录(通用记录)不是 DF 或 RDD,我不知道如何处理?
代码
val messages = SparkUtilsScala.createCustomDirectKafkaStreamAvro(ssc, kafkaParams, zookeeper_host, kafkaOffsetZookeeperNode, topicsSet)
val requestLines = messages.map(_._2)
requestLines.foreachRDD((rdd, time: Time) => {
rdd.foreachPartition { partitionOfRecords => {
val recordInjection = SparkUtilsJava.getRecordInjection(topicsSet.last)
for (avroLine <- partitionOfRecords) {
val record = recordInjection.invert(avroLine).get
println("Consumer output...."+record)
println("Consumer output schema...."+record.getSchema)
}}}}
以下是输出和架构
{"username": "Str 1-0", "tweet": "Str 2-0", "timestamp": 0}
{"type":"record","name":"twitter_schema","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"int"}]}
在此先感谢您的帮助
【问题讨论】:
标签: scala apache-kafka spark-streaming