【问题标题】:Issue storing AVRO Kafka streams to File System将 AVRO Kafka 流存储到文件系统的问题
【发布时间】:2017-07-07 03:42:42
【问题描述】:

我想使用我的 spark 流 API 将我的 AVRO kafka 流存储到文件系统,并使用以下分隔格式的 scala 代码,但在实现这一目标时面临一些挑战

record.write.mode(SaveMode.Append).csv("/Users/Documents/kafka-poc/consumer-out/)

由于记录(通用记录)不是 DF 或 RDD,我不知道如何处理?

代码

       val messages = SparkUtilsScala.createCustomDirectKafkaStreamAvro(ssc, kafkaParams, zookeeper_host, kafkaOffsetZookeeperNode, topicsSet)
       val requestLines = messages.map(_._2) 
       requestLines.foreachRDD((rdd, time: Time) => {
       rdd.foreachPartition { partitionOfRecords => {
       val recordInjection = SparkUtilsJava.getRecordInjection(topicsSet.last)
       for (avroLine <- partitionOfRecords) {
       val record = recordInjection.invert(avroLine).get
       println("Consumer output...."+record)                                                                
       println("Consumer output schema...."+record.getSchema)
       }}}}

以下是输出和架构

{"username": "Str 1-0", "tweet": "Str 2-0", "timestamp": 0}
{"type":"record","name":"twitter_schema","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"int"}]}

在此先感谢您的帮助

【问题讨论】:

    标签: scala apache-kafka spark-streaming


    【解决方案1】:

    我找到了解决方案。

    val jsonStrings: RDD[String] = sc.parallelize(Seq(record.toString())); 
    val result = sqlContext.read.json(jsonStrings).toDF(); 
    result.write.mode("Append").csv("/Users/Documents/kafka-poc/‌​consumer-out/"); 
    

    【讨论】:

      猜你喜欢
      • 2018-10-28
      • 1970-01-01
      • 2019-01-10
      • 1970-01-01
      • 2020-06-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-11
      相关资源
      最近更新 更多