【发布时间】:2018-07-02 12:24:08
【问题描述】:
我正在像这样的 Spark 流程序中使用 Kafka 主题:
import ...
object KafkaStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaConf = Map(
...
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Seq("topic"), kafkaConf)
)
val lines: DStream[String] = messages.map(_.value)
val line: DStream[String] = lines.flatMap(_.split("\n"))
process(line)
ssc.start()
ssc.awaitTermination()
}
def process(line: DStream[String]): Unit =
{
// here is where I want to convert the DStream to JSON
var json: Option[Any] = JSON.parseFull(line) // <--
println(json.getOrElse("json is NULL"))
if(json.isEmpty == false) {
println("NOT FALSE")
var map = json.get.asInstanceOf[Map[String, Any]]
// use every member of JSON document to access the value
map.get("any json element").toString
// do some other manipulation
}
}
}
在process 函数中,我想操作字符串的每一行以从中提取一个JSON 对象并执行进一步的处理和持久化。我该怎么做?
【问题讨论】:
标签: json apache-kafka spark-streaming