【问题标题】:How to store Kafka direct stream JSON into Cassandra?如何将 Kafka 直接流 JSON 存储到 Cassandra 中?
【发布时间】:2018-10-27 19:54:44
【问题描述】:

我必须将 Spark 流数据保存到 Cassandra。 Stream 来自 Kafka,Kafka 消息为 JSON 格式,如下所示。

{
  "status": "NOT_AVAILABLE",
  "itemid": "550672332",
  "qty": 0,
  "lmts": "2017-11-18T10:39:21-08:00",
  "timestamp": 1511030361000
}

我在 Spark 2.2.0 中编写了以下代码。

case class NliEvents(itemid: String, status: String, qty: String)

def main(args: Array[String]): Unit = {
 .....
  val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )


    val valueStream = stream.map(_.value())
    val cassandraCrud = new CassandraOperations
    import com.datastax.spark.connector._

    val columns = SomeColumns("itemid", "status", "qty")
    val keySpace = configuration.getString(env + ".cassandra.keyspace")
    val gson = new Gson()
    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats
    valueStream.foreachRDD((rdd, time) => {
      if (!rdd.isEmpty()) {
        val mapped = rdd.map(records => {
          val json = parse(records)
          val events = json.extract[NliEvents]
          events
        }
        )
        mapped.saveToCassandra(keySpace, "nli_events", columns)
      }
    })
}

当我运行这段代码时,我得到了

java.io.NotSerializableException: org.json4s.DefaultFormats$

错误。可能是我做得不对。

【问题讨论】:

    标签: apache-spark spark-streaming spark-cassandra-connector


    【解决方案1】:

    你能用下面的代码替换你的 foreach 语句吗?

    valueStream.mapPartitions(x => {
      val lst = scala.collection.mutable.ListBuffer[NliEvents]()
      while (x.hasNext) {
        val json = parse(x.next())
        val events = json.extract[NliEvents]
        lst += events
    
      }
      lst.toList.iterator
      }
    ).saveToCassandra(keySpace, "nli_events",columns)
    

    它应该工作。如果您遇到任何错误,请告诉我。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-12-06
      • 2015-01-11
      • 1970-01-01
      • 2019-08-15
      • 2020-06-23
      • 2022-12-16
      • 2023-03-10
      相关资源
      最近更新 更多