【发布时间】:2018-10-27 19:54:44
【问题描述】:
我必须将 Spark 流数据保存到 Cassandra。 Stream 来自 Kafka,Kafka 消息为 JSON 格式,如下所示。
{
"status": "NOT_AVAILABLE",
"itemid": "550672332",
"qty": 0,
"lmts": "2017-11-18T10:39:21-08:00",
"timestamp": 1511030361000
}
我在 Spark 2.2.0 中编写了以下代码。
case class NliEvents(itemid: String, status: String, qty: String)
def main(args: Array[String]): Unit = {
.....
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val valueStream = stream.map(_.value())
val cassandraCrud = new CassandraOperations
import com.datastax.spark.connector._
val columns = SomeColumns("itemid", "status", "qty")
val keySpace = configuration.getString(env + ".cassandra.keyspace")
val gson = new Gson()
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
valueStream.foreachRDD((rdd, time) => {
if (!rdd.isEmpty()) {
val mapped = rdd.map(records => {
val json = parse(records)
val events = json.extract[NliEvents]
events
}
)
mapped.saveToCassandra(keySpace, "nli_events", columns)
}
})
}
当我运行这段代码时,我得到了
java.io.NotSerializableException: org.json4s.DefaultFormats$
错误。可能是我做得不对。
【问题讨论】:
标签: apache-spark spark-streaming spark-cassandra-connector