【问题标题】:Write avro file in HDFS - exists在 HDFS 中写入 avro 文件 - 存在
【发布时间】:2020-10-16 12:41:15
【问题描述】:

目前我正在学习 Spark Streaming 和 avro,所以我的第一个示例是,读取 Spark RDD 并构建通用记录,创建 avro 文件,这个文件我应该在 HDFS 中写入。现在我可以打开 avro 文件并且我确实附加到 HDFS 文件存在吗?

这段代码编写了一个 avro 文件,但是当我尝试添加或追加时,它失败了。我为此使用 java 8

public static void saveAvro(GenericRecord record, Schema schema) throws IOException {

        DatumWriter<GenericRecord> bdPersonDatumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(bdPersonDatumWriter);

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro"),
                conf);
        Path F = new Path("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro");
        fs.setReplication(F, (short) 1);

        if (!fs.exists(F)) {
            System.out.println("File not exists.. creating....");
            OutputStream out = fs.create(F, (short) 1);
            System.out.println("OutputStream create.");
            dataFileWriter.create(schema, out);
            System.out.println("dataFileWriter create.");
            dataFileWriter.append(record);
            System.out.println("dataFileWriter append OK {0} .");

        } else {
            //Here fail, not open file.. avro stored in HDFS
            System.out.println("File exists....");
           // I want to add information to an existing avro file.
            dataFileWriter.append(record);
            System.out.println("dataFileWriter append OK {1} .");
        }
        dataFileWriter.close();
        System.out.println("dataFileWriter closed.");

    }
    

追加存在文件 avro HDFS 的堆栈跟踪:

线程“主”org.apache.avro.AvroRuntimeException 中的异常:不是 打开 在 org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:88) 在 org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:311) 在 com.test.avro.App.saveAvro(App.java:83) 在 com.test.avro.App.main(App.java:55)

DataFileWriter appendTo 方法只接受 File java.nio。我正在尝试做的事情是正确的还是有其他方法?

编辑 1. 我想向现有文件添加信息。

第一个代码 sn-p 显示了您尝试创建 avro 文件的实现。这是我的火花流的框架代码:

JavaStreamingContext jssc = sparkConfigurationBuilder
                .buildJSC(sparkConfigurationBuilder.buildSparkConfiguration());
    
    jssc.sparkContext().checkpointFile("c:\\tmp");
    Map<String, Object> kafkaParams = sparkDriverUtils.getKafkaProperties();        
    Collection<String> topics = Arrays.asList(sparkDriverUtils.getTopics().trim().split(","));// 1 o more topics        
    LOGGER.warn("Lista de Topics: " + topics.toString());
    

...

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
//This DSTream resulto to avro..
JavaDStream<Transactions> transactionsDS = transactions.map(f-> {
            Transactions txn = jsonMapperUtil.rowToTransaction(f);
            LOGGER.warn("Retornar  : JavaDStream<Transactions>");
            return  txn;
        });

现在 transactionsDS 结果我想在 HDFS 中保存为 avro 文件。我有一个问题,我可以获取 JavaStreamingContext 来为数据集创建 SparkSession,还是应该更改订阅 kafka 代理的方式?

问候。

【问题讨论】:

  • 您为什么使用 RDD 或低级 Java API? Spark 具有内置的 Avro 支持 ...spark.apache.org/docs/latest/sql-data-sources-avro.html
  • 谢谢,我看到了 url,SparkSession 它可以订阅 kafka 代理,流与 Java InputStream 相同?或与 Java InputStream 相同?获得 SparkSession?
  • 你在使用 Kafka 吗?好像您在这里只使用 HDFS。你在使用 Confluent Schema Registry 吗?或者阅读任何关于 Spark/Flink with Kafka + Avro 的 Cloudera 博客?
  • 嗨,是的,我正在使用 Cassandra、Kafka + Kafka 连接器(镜头作为源)和 Spark 流,我想将我的结果保存为 HDFS 中的 avro。现在我正在使用解决方法,首先保存到本地系统并在移动到 hdfs 之后,但我将使用流媒体查看 Sql Spark。
  • 我会使用 Streamsets 或 Nifi 而不是为此编写 Spark 代码

标签: java apache-kafka hdfs avro


【解决方案1】:

DataFileWriter appendTo 方法只接受一个 File java.nio

正确。 Avro 没有连接到 HDFS 路径。

为了“附加到 HDFS 文件”,您需要将它们下载到本地,然后覆盖它们的全部内容


除此之外,您还提到了 Spark Streaming,但显示的代码中没有任何部分实际使用 Spark API 调用

【讨论】:

  • 编辑我的问题,我添加了 spark sn-p。问候。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-11-07
  • 1970-01-01
  • 1970-01-01
  • 2014-01-03
相关资源
最近更新 更多