【发布时间】:2018-08-04 11:48:00
【问题描述】:
我的用例是将文件以不同格式上传到 Cloud Storage。我的数据流管道会将文件/对象从 csv/xml 转换为 json 格式。是否可以在 Dataflow 中确定文件格式类型(即 csv 或 xml),然后触发不同的逻辑来相应地处理它?或者,是否可以确定文件名,然后我可以解析后缀以确定文件类型?
谢谢
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow
我的用例是将文件以不同格式上传到 Cloud Storage。我的数据流管道会将文件/对象从 csv/xml 转换为 json 格式。是否可以在 Dataflow 中确定文件格式类型(即 csv 或 xml),然后触发不同的逻辑来相应地处理它?或者,是否可以确定文件名,然后我可以解析后缀以确定文件类型?
谢谢
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow
使用FileIO.match() 将返回ReadableFile 对象。如果我们记录一个匹配项,我们可以看到元数据包含 resourceId 键值对中的文件名:
ReadableFile{metadata=Metadata{resourceId=gs://BUCKET_NAME/different/test1.csv, sizeBytes=30,isReadSeekEfficient=true},压缩=UNCOMPRESSED}
使用类似下面的 sn-p 我们可以创建键值对,其中键是文件格式:
PCollection<KV<String, String>> filenames = p.apply("Read files", FileIO.match().filepattern(input))
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void process(ProcessContext c) {
// we'll output a KV where the file suffix is the key
String filename = c.element().getMetadata().resourceId().toString();
c.output(KV.of(filename.substring(filename.lastIndexOf('.') + 1), filename));
}
}));
文件名是从ProcessContext.getMetadata().resourceId() 检索到的。 input 是匹配模式(即"gs://BUCKET_NAME/path/to/input/files/folder/*")
然后我们可以根据密钥在管道中以不同的方式处理文件。在我的示例中,我只是根据数据类型编写了不同的字符串:
filenames //
.apply("Process according to type", ParDo.of(new DoFn<KV<String, String>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String key = c.element().getKey();
String value = c.element().getValue();
if (key.equals("csv")) {c.output("CSV - " + value.substring(value.lastIndexOf('/') + 1));}
else {c.output("XML - " + value.substring(value.lastIndexOf('/') + 1));}
}
}))//
.apply(TextIO.write().to(output).withoutSharding());
在我的情况下,结果文件将包含这样的内容:
CSV - test1.csv
CSV - test2.csv
XML - test2.xml
XML - test1.xml
CSV - test3.csv
或者,您可以使用不同的匹配模式读取它们,应用所需的转换,然后将结果展平。
pcl = pcl.and(
p.apply("Read CSV files", FileIO.match().filepattern(input + "*.csv"))
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of("csv", c.element().getMetadata().resourceId().toString()));
}
})))
.and(
p.apply("Read XML files", FileIO.match().filepattern(input + "*.xml"))
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void process(ProcessContext c){
c.output(KV.of("xml", c.element().getMetadata().resourceId().toString()));
}
})));
// combine/flatten all the PCollections together
PCollection<KV<String, String>> flattenedPCollection = pcl.apply(Flatten.pCollections());
在这种情况下,input 是包含文件的文件夹的路径(不包括 *)
【讨论】: