【问题标题】:Spark - Java - Create Parquet/Avro Without Using Dataframes of Spark SQLSpark - Java - 在不使用 Spark SQL 的数据帧的情况下创建 Parquet/Avro
【发布时间】:2019-01-24 07:44:58
【问题描述】:

我想将 Spark 应用程序的输出(我们只使用核心 Spark,从事该项目的人不想将其更改为 Spark SQL)作为 Parquet 或 Avro 文件。

当我查找这两种文件类型时,我找不到任何没有 DataFrames 或一般 Spark SQL 的示例。我可以在不使用 SparkSQL 的情况下实现这一目标吗?

我的数据是表格的,它有列,但在处理过程中,将使用所有数据,而不是单个列。它的列是在运行时决定的,所以没有“名称,ID,地址”有点通用的列。它看起来像这样:

No f1       f2       f3       ...
1, 123.456, 123.457, 123.458, ...
2, 123.789, 123.790, 123.791, ...
...

【问题讨论】:

  • 能否请您详细说明您想要实现的目标
  • 我的输出变大了,因为我使用了更多的输入数据。目前,它是 3.5Gb。它的大小应该更小(我可以通过 Snappy 压缩来实现),但他们也问我是否有另一种输出类型可用于减小大小、读/写时间等。目前它只是人类可读的文本文件。
  • 好的,如果你想在没有数据框和 spark sql 的情况下将输出保存在 Avro 中,那么你可以使用 rdd
  • Rdd.toDF().write.parquet(文件路径)
  • 好的,这意味着无论如何我都需要使用 SparkSQL。至少只在最后。因为,我看不到我的 Pair 和普通 JavaRDD 的 toDF() 方法。

标签: java apache-spark avro parquet


【解决方案1】:

如果不将 rdd 转换为数据框,则无法将其保存在 parquet 中。 Rdd 没有 schema,但是 parquet 文件是列格式,需要 schema,所以我们需要将其转换为 dataframe。

你可以使用createdataframe api

【讨论】:

    【解决方案2】:

    我试过了,它就像一个冠军......

    public class ParquetHelper{
    
        static ParquetWriter<GenericData.Record> writer = null;
        private static Schema schema;
    
        public ParquetHelper(Schema schema, String pathName){
    
            try {
                Path path = new Path(pathName);
                writer = AvroParquetWriter.
                        <GenericData.Record>builder(path)
                        .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
                        .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
                        .withSchema(schema)
                        .withConf(new Configuration())
                        .withCompressionCodec(CompressionCodecName.SNAPPY)
                        .withValidation(true)
                        .withDictionaryEncoding(false)
                        .build();
                this.schema = schema;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
         /*
         * 
         */
        public static void writeToParquet(JavaRDD<Record> empRDDRecords) throws IOException {
    
            empRDDRecords.foreach(record -> {
                if(null != record && new RecordValidator().validate(record, schema).isEmpty()){
                    writeToParquet(record);
                }// TODO collect bad records here
            });
    
            writer.close();
        }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2019-09-25
      • 1970-01-01
      • 2016-11-22
      • 1970-01-01
      • 2020-07-22
      • 2020-06-07
      • 1970-01-01
      • 2015-08-30
      • 2022-12-09
      相关资源
      最近更新 更多