【问题标题】:Consuming Kafka DStream in Spark Streaming Procss在 Spark Streaming 过程中使用 Kafka DStream
【发布时间】:2018-07-02 12:24:08
【问题描述】:

我正在像这样的 Spark 流程序中使用 Kafka 主题:

import ...

object KafkaStreaming {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val kafkaConf = Map(
      ...
    )

    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Seq("topic"), kafkaConf)
    )
    val lines: DStream[String] = messages.map(_.value)
    val line: DStream[String] = lines.flatMap(_.split("\n"))

    process(line)

    ssc.start()
    ssc.awaitTermination()
  }

def process(line: DStream[String]): Unit =
  {
    // here is where I want to convert the DStream to JSON
    var json: Option[Any] = JSON.parseFull(line) // <--
    println(json.getOrElse("json is NULL"))
    if(json.isEmpty == false) {
      println("NOT FALSE")
      var map = json.get.asInstanceOf[Map[String, Any]]

     // use every member of JSON document to access the value
      map.get("any json element").toString
     // do some other manipulation
    }
  }
}

process 函数中,我想操作字符串的每一行以从中提取一个JSON 对象并执行进一步的处理和持久化。我该怎么做?

【问题讨论】:

    标签: json apache-kafka spark-streaming


    【解决方案1】:

    可以使用DStream.map,然后使用foreachRDD,而不是使用DStream[String]

    def process(line: String): Unit = ???
    

    然后:

    messages
     .map(_.value)
     .flatMap(_.split("\n"))
     .map(process)
     .foreachRDD { rdd =>
       rdd.foreachPartition { itr => 
       // Do stuff with `Iterator[String]` after JSON transformation
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2018-04-12
      • 1970-01-01
      • 2016-10-07
      • 1970-01-01
      • 2017-09-30
      • 2020-06-03
      • 2016-11-29
      • 1970-01-01
      • 2017-09-03
      相关资源
      最近更新 更多