【发布时间】:2018-07-31 00:41:26
【问题描述】:
我只是想知道 Spark 中是否有方法,所以我可以将 JavaInputDStream 保存为 Json 文件,或者通常保存为任何文件。 如果没有,是否还有其他可能保存的内容 一个 kafka 主题作为 Spark 中的文件。
非常感谢!
【问题讨论】:
标签: java json apache-spark apache-kafka spark-streaming
我只是想知道 Spark 中是否有方法,所以我可以将 JavaInputDStream 保存为 Json 文件,或者通常保存为任何文件。 如果没有,是否还有其他可能保存的内容 一个 kafka 主题作为 Spark 中的文件。
非常感谢!
【问题讨论】:
标签: java json apache-spark apache-kafka spark-streaming
当您将 JavaInputDStream 映射到 stream 时,您可以执行以下操作:
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
}).foreachPartition(partition -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
if (partition.hasNext()) {
PrintWriter out = new PrintWriter("filename.txt");;
out.println(text);
try {
while (partition.hasNext()) {
Tuple2<String, String> message = partition.next();
out.println(message);
}
} catch (Exception e) {
e.printStackTrace(
}
});
});
ssc.start();
ssc.awaitTermination();
请不要忘记,如果您的 Kafka 主题中有多个分区,您将按照上述方法为每个分区写入一个文件。
【讨论】: