【发布时间】:2016-12-17 22:24:45
【问题描述】:
我正在使用 Spark Kafka 连接器从 Kafka 集群中获取数据。从中,我以JavaDStream<String> 的形式获取数据。如何以JavaDStream<EventLog> 的形式获取数据,其中EventLog 是Java bean?
public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
jssc.start();
jssc.awaitTermination();
return lines;
}
我的目标是将这些数据保存到 Cassandra 中,其中的表与 EventLog 具有相同的规格。 Spark Cassandra 连接器在插入语句中接受JavaRDD<EventLog>,如下所示:javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();。我想从 Kafka 那里得到这些JavaRDD<EventLog>。
【问题讨论】:
-
您的意思是您仍然希望将消息作为字符串对并进行转换吗?还是您希望使用 JavaReceiverInputDStream
?您究竟希望在哪里引入 EventLog 类型?您是否尝试定义一个接受 EventLog 类型并从中构建 JavaDStream 的接收器? -
@Sunny 我的目标是将数据写入 Cassandra。 Spark Cassandra 连接器在插入语句中接受
JavaRDD<EventLog>,如下所示:javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();。我想从卡夫卡那里得到这些JavaRDD<EventLog>。 -
您是否也有权访问将这些事件日志写入 kafka 的代码?是否实现了自定义序列化程序,是否将 EventLogs 序列化并作为 EventLogs 写入 Kafka?
-
@Sunny 有自定义序列化程序正在发送到 Kafka。
-
我相信实现一个自定义接收器是最简单的,它将从 org.apache.spark.streaming.receiver 扩展 Receiver
。在 onStart() 方法中,您应该开始使用来自 Kafka 的这些事件并添加一个侦听器。如果你有一个活跃的接收者并且有一个 JavaStreamingContext 你可以做 jssc.receiverStream(receiver) 来获取 JavaReceiverInputDStream
标签: java apache-spark cassandra apache-kafka