【发布时间】:2017-05-18 15:18:22
【问题描述】:
val topics= "test"
val zkQuorum="localhost:2181"
val group="test-consumer-group"
val sparkConf = new org.apache.spark.SparkConf()
.setAppName("XXXXX")
.setMaster("local[*]")
.set("cassandra.connection.host", "127.0.0.1")
.set("cassandra.connection.port", "9042")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
我得到这样的 DStream (json)
[{"id":100,"firstName":"Beulah","lastName":"Fleming","gender":"female","ethnicity":"SpEd","height":167,"address":27,"createdDate":1494489672243,"lastUpdatedDate":1494489672244,"isDeleted":0},{"id":101,"firstName":"Traci","lastName":"Summers","gender":"female","ethnicity":"Frp","height":181,"address":544,"createdDate":1494510639611,"lastUpdatedDate":1494510639611,"isDeleted":0}]
通过上述程序,我在 DStream 中获取 json 数据。 我将如何处理这些 Dstream 数据并存储到 Cassandra 或弹性搜索中?那么我将如何从 DStream 中检索数据(以 json 格式)并存储在 Cassandra 中?
【问题讨论】:
标签: json spark-streaming kafka-consumer-api spark-cassandra-connector