【发布时间】:2016-08-02 14:54:28
【问题描述】:
目标:使用 Spark Streaming 读取 kafka 并将数据存储在 cassandra 作者:Java Spark cassandra 连接器 1.6 数据输入:简单json行对象{"id":"1","field1":"value1}
我有一个 java 类,可以通过 spark 流从 kafka 读取,处理读取的数据,然后将其存储在 cassandra 中。
这里是主要代码:
**JavaPairReceiverInputDStream**<String, String> messages =
KafkaUtils.createStream(ssc,
targetKafkaServerPort, targetTopic, topicMap);
**JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
public List<Object> call( Tuple2<String,String> tuple2){
List<Object> **list**=new ArrayList<Object>();
Gson gson = new Gson();
MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
myclass.setNewData("new_data");
String jsonInString = gson.toJson(myclass);
list.add(jsonInString);
return list;
}
});
下一个代码不正确:
**javaFunctions**(list)
.writerBuilder("schema", "table", mapToRow(JavaDStream.class))
.saveToCassandra();
因为“javaFunctions”方法需要一个 JavaRDD 对象,而“list”是一个 JavaDStream...
我需要将 JavaDStream 转换为 JavaRDD,但我找不到正确的方法...
有什么帮助吗?
【问题讨论】:
标签: spark-streaming spark-cassandra-connector