【问题标题】:Flink ParquetSinkWriter FileAlreadyExistsExceptionFlink ParquetSinkWriter FileAlreadyExistsException
【发布时间】:2020-05-23 23:35:18
【问题描述】:

我正在尝试使用 BucketingSink 和自定义 ParquetSinkWriter 在 HDFS 上使用 Apache Flink 写入 parquet 文件。

这里是代码,上面的错误表明当从下面启用检查点(在 BucketingSink 类中调用 snapshotState())刷新方法不是安静工作时。甚至作家也用“writer.close();”关闭但仍然从“writer = createWriter();”得到错误。有什么想法吗?谢谢

出现这样的错误:

org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/flink_parquet_fils_with_checkingpoint/year=20/month=2/day=1/hour=17/_part-4-9.in-progress 对于客户端 192.168.56.202 已经存在 在 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:3003) 在 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2890)

.... . 在 flink.untils.ParquetSinkWriter.flush(ParquetSinkWriterForecast.java:81) 在 org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.snapshotState(BucketingSink.java:749)

import org.apache.flink.util.Preconditions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
 * Parquet writer.
 *
 * @param <T>
 */
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {

    private static final long serialVersionUID = -975302556515811398L;

    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
    private final int pageSize = 64 * 1024;

    private final String schemaRepresentation;

    private transient Schema schema;
    private transient ParquetWriter<GenericRecord> writer;
    private transient Path path;

    private int position;

    public ParquetSinkWriter(String schemaRepresentation) {
        this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);
    }

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        this.position = 0;
        this.path = path;

        if (writer != null) {
            writer.close();
        }

        writer = createWriter();
    }

    @Override
    public long flush() throws IOException {
        Preconditions.checkNotNull(writer);
        position += writer.getDataSize();
        writer.close();
        writer = createWriter();

        return position;
    }

    @Override
    public long getPos() throws IOException {
        Preconditions.checkNotNull(writer);
        return position + writer.getDataSize();
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
            writer = null;
        }
    }

    @Override
    public void write(T element) throws IOException {
        Preconditions.checkNotNull(writer);
        writer.write(element);
    }

    @Override
    public Writer<T> duplicate() {
        return new ParquetSinkWriter<>(schemaRepresentation);
    }

    private ParquetWriter<GenericRecord> createWriter() throws IOException {
        if (schema == null) {
            schema = new Schema.Parser().parse(schemaRepresentation);
        }

        return AvroParquetWriter.<GenericRecord>builder(path)
            .withSchema(schema)
            .withDataModel(new GenericData())
            .withCompressionCodec(compressionCodecName)
            .withPageSize(pageSize)
            .build();
    }
}


【问题讨论】:

    标签: java apache-kafka hdfs apache-flink parquet


    【解决方案1】:

    您尝试创建的文件似乎当前存在。这是因为您使用的是默认写入模式CREATE,当文件存在时该模式会失败。您可以尝试做的是更改您的代码以使用OVERWRITE 模式。您可以更改 createWriter() 方法以返回如下内容:

    return AvroParquetWriter.<GenericRecord>builder(path)
                .withSchema(schema)
                .withDataModel(new GenericData())
                .withCompressionCodec(compressionCodecName)
                .withPageSize(pageSize)
                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                .build();
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-12-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-10-31
      • 2017-02-13
      • 1970-01-01
      相关资源
      最近更新 更多