【问题标题】:Can you detect object/file name using Cloud Dataflow您可以使用 Cloud Dataflow 检测对象/文件名吗
【发布时间】:2018-08-04 11:48:00
【问题描述】:

我的用例是将文件以不同格式上传到 Cloud Storage。我的数据流管道会将文件/对象从 csv/xml 转换为 json 格式。是否可以在 Dataflow 中确定文件格式类型(即 csv 或 xml),然后触发不同的逻辑来相应地处理它?或者,是否可以确定文件名,然后我可以解析后缀以确定文件类型?

谢谢

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow


    【解决方案1】:

    使用FileIO.match() 将返回ReadableFile 对象。如果我们记录一个匹配项,我们可以看到元数据包含 resourceId 键值对中的文件名:

    ReadableFile{metadata=Metadata{resourceId=gs://BUCKET_NAME/different/test1.csv, sizeBytes=30,isReadSeekEfficient=true},压缩=UNCOMPRESSED}

    使用类似下面的 sn-p 我们可以创建键值对,其中键是文件格式:

    PCollection<KV<String, String>> filenames = p.apply("Read files", FileIO.match().filepattern(input))
            .apply(FileIO.readMatches())
            .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    // we'll output a KV where the file suffix is the key
                    String filename = c.element().getMetadata().resourceId().toString();
                    c.output(KV.of(filename.substring(filename.lastIndexOf('.') + 1), filename));
                }
            }));
    

    文件名是从ProcessContext.getMetadata().resourceId() 检索到的。 input 是匹配模式(即"gs://BUCKET_NAME/path/to/input/files/folder/*"

    然后我们可以根据密钥在管道中以不同的方式处理文件。在我的示例中,我只是根据数据类型编写了不同的字符串:

    filenames //
    .apply("Process according to type", ParDo.of(new DoFn<KV<String, String>, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String key = c.element().getKey();
            String value = c.element().getValue();
            if (key.equals("csv")) {c.output("CSV - " + value.substring(value.lastIndexOf('/') + 1));}
            else {c.output("XML - " + value.substring(value.lastIndexOf('/') + 1));}
        }
    }))//
    .apply(TextIO.write().to(output).withoutSharding());
    

    在我的情况下,结果文件将包含这样的内容:

    CSV - test1.csv
    CSV - test2.csv
    XML - test2.xml
    XML - test1.xml
    CSV - test3.csv
    

    或者,您可以使用不同的匹配模式读取它们,应用所需的转换,然后将结果展平。

    pcl = pcl.and(
                    p.apply("Read CSV files", FileIO.match().filepattern(input + "*.csv"))
                            .apply(FileIO.readMatches())
                            .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
                                @ProcessElement
                                public void process(ProcessContext c) {
                                    c.output(KV.of("csv", c.element().getMetadata().resourceId().toString()));
                                }
                            })))
    
                    .and(
                    p.apply("Read XML files", FileIO.match().filepattern(input + "*.xml"))
                            .apply(FileIO.readMatches())
                            .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
                                @ProcessElement
                                public void process(ProcessContext c){
                                    c.output(KV.of("xml", c.element().getMetadata().resourceId().toString()));
                                }
                            })));
    
    // combine/flatten all the PCollections together
    PCollection<KV<String, String>> flattenedPCollection = pcl.apply(Flatten.pCollections());
    

    在这种情况下,input 是包含文件的文件夹的路径(不包括 *

    【讨论】:

    • 谢谢!这正是我想要的。
    猜你喜欢
    • 2016-08-05
    • 2018-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多