【发布时间】:2020-09-08 17:16:06
【问题描述】:
全部,
我正在使用从 Kafka 转储到 HDFS 的数据。我能够使用数据并希望从 Kafka 获取记录的总数并将其作为文件保存到 HDFS 中,以便我可以使用该文件进行验证。我可以在控制台中打印记录,但我不确定如何创建总计数文件?
从 Kafka 中提取记录的查询:
Dataset ds1=ds.filter(args[5]);
StreamingQuery query = ds1
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", args[6] + "/checkpoints" + args[2])
.trigger(Trigger.Once())
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
}
以及我为获取记录并在控制台中打印而编写的代码:
Dataset stream=ds1.groupBy("<column_name>").count(); // 实际上,我想不使用 GroupBy 来获取计数,我尝试过 long stream=ds1.count() 但我遇到了错误。
StreamingQuery query1=stream.coalesce(1)
.writeStream()
.format("csv")
.option("path", path + "/record")
.start();
try {
query1.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
}
这不起作用,你能帮我解决这个问题吗?
【问题讨论】:
标签: java apache-spark hadoop apache-kafka spark-streaming-kafka