【问题标题】:Spark save Kafka InputDStream as Json fileSpark 将 Kafka InputDStream 保存为 Json 文件
【发布时间】:2018-07-31 00:41:26
【问题描述】:

我只是想知道 Spark 中是否有方法,所以我可以将 JavaInputDStream 保存为 Json 文件,或者通常保存为任何文件。 如果没有,是否还有其他可能保存的内容 一个 kafka 主题作为 Spark 中的文件。

非常感谢!

【问题讨论】:

    标签: java json apache-spark apache-kafka spark-streaming


    【解决方案1】:

    当您将 JavaInputDStream 映射到 stream 时,您可以执行以下操作:

    stream.foreachRDD(rdd -> {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
                rdd.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
                    @Override
                    public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                        return new Tuple2<>(record.key(), record.value());
                    }
                }).foreachPartition(partition -> {
    
                    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                    System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
    
                    if (partition.hasNext()) {
    
                        PrintWriter out = new PrintWriter("filename.txt");;
                        out.println(text);
    
                        try {
                            while (partition.hasNext()) {
    
                                Tuple2<String, String> message = partition.next();
                                out.println(message);
                            }
    
                        } catch (Exception e) {
                            e.printStackTrace(
                    }
    
                });
            });
            ssc.start();
            ssc.awaitTermination();
    

    请不要忘记,如果您的 Kafka 主题中有多个分区,您将按照上述方法为每个分区写入一个文件。

    【讨论】:

    • 非常感谢,这对我很有帮助。
    猜你喜欢
    • 2017-07-02
    • 1970-01-01
    • 2020-09-15
    • 2021-09-29
    • 1970-01-01
    • 2015-07-06
    • 2020-02-10
    • 2014-12-31
    • 1970-01-01
    相关资源
    最近更新 更多