【问题标题】:Unzip File in Dataflow Before Reading读取前在数据流中解压缩文件
【发布时间】:2015-10-06 08:00:03
【问题描述】:

我们的客户端正在将文件上传到 GCS,但它们被压缩了。有什么方法可以使用 Java Dataflow SDK,我们可以在其中运行所有压缩文件,解压缩文件,将所有生成的 .csv 文件合并到一个文件中,然后只进行TextIO 转换?

编辑

回答jkffs的问题,

  1. 好吧,我真的需要将它们全部合并到一个文件中,从阅读的角度来看会容易得多。
  2. 它们是 ZIP 文件,而不是 GZ 或 BZ 或其他任何文件。每个 ZIP 包含多个文件。文件名并不重要,是的,我实际上更喜欢 TextIO 在每个存档的基础上透明地解压缩和连接所有文件。

希望有帮助!

【问题讨论】:

  • 您能否解释一下为什么要将所有 .csv 文件合并为一个,然后再进一步处理它们 - 您是否需要按顺序而不是并行处理文件?之后是否有任何可并行化的处理?
  • 另外,您能详细介绍一下您的 zip 文件吗?它们真的是 .zip 还是 .gz/.bz2? (TextIO 目前支持 gzip 和 bzip2 压缩 - cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/…,但不支持 .zip 文件)如果是 .zip,每个 .zip 文件中有很多文件,还是只有一个?档案中的文件名是否重要?例如。如果 TextIO 透明地解压缩并连接 zip 存档中的所有文件,这对你有用吗?
  • @jkff 我用一些答案更新了这个问题!

标签: google-cloud-dataflow


【解决方案1】:

因为我遇到了同样的问题,并且只找到了这个 1 岁且非常不完整的解决方案。以下是有关如何在 google 数据流上解压缩文件的完整示例:

public class SimpleUnzip {

private static final Logger LOG = LoggerFactory.getLogger(SimpleUnzip.class);

public static void main(String[] args){
    Pipeline p = Pipeline.create(
            PipelineOptionsFactory.fromArgs(args).withValidation().create());

    GcsUtilFactory factory = new GcsUtilFactory();
    GcsUtil util = factory.create(p.getOptions());
    try{
        List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri("gs://tlogdataflow/test/*.zip"));
        List<String> paths = new ArrayList<String>();

        for(GcsPath gcsp: gcsPaths){
            paths.add(gcsp.toString());
        }
        p.apply(Create.of(paths))
            .apply(ParDo.of(new UnzipFN()));
        p.run();

        }
    catch(Exception e){
        LOG.error(e.getMessage());
        }


}

public static class UnzipFN extends DoFn<String,Long>{
    private static final long serialVersionUID = 2015166770614756341L;
    private long filesUnzipped=0;
    @Override
    public void processElement(ProcessContext c){
        String p = c.element();
        GcsUtilFactory factory = new GcsUtilFactory();
        GcsUtil u = factory.create(c.getPipelineOptions());
        byte[] buffer = new byte[100000000];
        try{
            SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
            InputStream is = Channels.newInputStream(sek);
            BufferedInputStream bis = new BufferedInputStream(is);
            ZipInputStream zis = new ZipInputStream(bis);
            ZipEntry ze = zis.getNextEntry();
            while(ze!=null){
                LOG.info("Unzipping File {}",ze.getName());
                WritableByteChannel wri = u.create(GcsPath.fromUri("gs://tlogdataflow/test/" + ze.getName()), getType(ze.getName()));
                OutputStream os = Channels.newOutputStream(wri);
                int len;
                while((len=zis.read(buffer))>0){
                    os.write(buffer,0,len);
                }
                os.close();
                filesUnzipped++;
                ze=zis.getNextEntry();

            }
            zis.closeEntry();
            zis.close();

        }
        catch(Exception e){
            e.printStackTrace();
        }
    c.output(filesUnzipped);
    }

    private String getType(String fName){
        if(fName.endsWith(".zip")){
            return "application/x-zip-compressed";
        }
        else {
            return "text/plain";
        }
    }
}

}

【讨论】:

    【解决方案2】:

    Dataflow / Apache Beam 自动支持 TextIO 中的 ZIP 压缩文件:TextIO.read().from(filepattern) 将根据其扩展名自动解压缩与文件模式匹配的文件,.zip 是支持的格式之一 - 在这种情况下,它将隐含将.zip 中的所有文件连接到一个文件中,并从中解析文本行。

    如果文件没有扩展名,您还可以使用 TextIO.read().from(filepattern).withCompressionType(...) 显式指定压缩类型。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-18
    • 1970-01-01
    相关资源
    最近更新 更多