【问题标题】:How to get Total count of Records from Kafka Topic and Save into HDFS?如何从 Kafka 主题获取记录总数并保存到 HDFS?
【发布时间】:2020-09-08 17:16:06
【问题描述】:

全部,

我正在使用从 Kafka 转储到 HDFS 的数据。我能够使用数据并希望从 Kafka 获取记录的总数并将其作为文件保存到 HDFS 中,以便我可以使用该文件进行验证。我可以在控制台中打印记录,但我不确定如何创建总计数文件?

从 Kafka 中提取记录的查询:

Dataset ds1=ds.filter(args[5]);
 StreamingQuery query = ds1
                   .coalesce(10)
                   .writeStream()
                   .format("parquet")
                   .option("path", path.toString())
                   .option("checkpointLocation", args[6] + "/checkpoints" + args[2])
                   .trigger(Trigger.Once())
                   .start();

          try {
                query.awaitTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
                System.exit(1);
            }   

以及我为获取记录并在控制台中打印而编写的代码:

Dataset stream=ds1.groupBy("<column_name>").count(); // 实际上,我想不使用 GroupBy 来获取计数,我尝试过 long stream=ds1.count() 但我遇到了错误。

 StreamingQuery query1=stream.coalesce(1)
                        .writeStream()
                        .format("csv")
                       .option("path", path + "/record")
                       .start();

               try {
                    query1.awaitTermination();
                 } catch (StreamingQueryException e) {
                     e.printStackTrace();
                    System.exit(1);
                } 

这不起作用,你能帮我解决这个问题吗?

【问题讨论】:

    标签: java apache-spark hadoop apache-kafka spark-streaming-kafka


    【解决方案1】:

    一个主题中任何时候的记录数是一个移动的目标。

    您需要使用旧的 Spark Streaming 来查找每个 Spark 分区批次的记录数,然后使用 Accumulator 来计算所有已处理的记录,但这将是您能得到的最接近的记录数。

    Spark + Kafka 声称只有一次处理语义,因此我建议您专注于错误捕获和监控,而不是仅计数验证。

    【讨论】:

      猜你喜欢
      • 2017-01-08
      • 1970-01-01
      • 2020-08-19
      • 1970-01-01
      • 2016-05-27
      • 1970-01-01
      • 2017-10-10
      • 2016-06-13
      • 2019-07-04
      相关资源
      最近更新 更多