【问题标题】:How to store Dstream data(json) into cassandra?如何将 Dstream 数据(json)存储到 cassandra 中?
【发布时间】:2017-05-18 15:18:22
【问题描述】:
       val topics= "test"
       val zkQuorum="localhost:2181"
       val group="test-consumer-group"    
       val sparkConf = new org.apache.spark.SparkConf()
          .setAppName("XXXXX")
          .setMaster("local[*]")
          .set("cassandra.connection.host", "127.0.0.1")
          .set("cassandra.connection.port", "9042")

        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

我得到这样的 DStream (json)

[{"id":100,"firstName":"Beulah","lastName":"Fleming","gender":"female","ethnicity":"SpEd","height":167,"address":27,"createdDate":1494489672243,"lastUpdatedDate":1494489672244,"isDeleted":0},{"id":101,"firstName":"Traci","lastName":"Summers","gender":"female","ethnicity":"Frp","height":181,"address":544,"createdDate":1494510639611,"lastUpdatedDate":1494510639611,"isDeleted":0}]

通过上述程序,我在 DStream 中获取 json 数据。 我将如何处理这些 Dstream 数据并存储到 Cassandra 或弹性搜索中?那么我将如何从 DStream 中检索数据(以 json 格式)并存储在 Cassandra 中?

【问题讨论】:

    标签: json spark-streaming kafka-consumer-api spark-cassandra-connector


    【解决方案1】:

    您需要导入com.datastax.spark.connector._,将流的元素转换为适当的案例类

    case class Record(id: String, firstName: String, ...)
    val colums = SomeColums("id", "first_name", ...)
    val mapped = lines.map(whateverDataYouHave => fuctionThatReutrnsARecordObject)
    

    并使用隐式函数 saveToCassandra 保存

    mapped.saveToCassandra(KEYSPACE_NAME, TABLE_NAME, columns)
    

    更多信息请查看文档https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-09-06
      • 1970-01-01
      • 2019-01-11
      • 1970-01-01
      • 1970-01-01
      • 2019-09-14
      • 1970-01-01
      相关资源
      最近更新 更多