【发布时间】:2017-12-23 17:28:35
【问题描述】:
我很难理解 TextIO.write() 的 .withFileNamePolicy 的概念。提供 FileNamePolicy 的要求对于做一些简单的事情来说似乎非常复杂,比如指定一个 GCS 存储桶来写入流文件。
概括地说,我将 JSON 消息流式传输到 PubSub 主题,我想将这些原始消息写入 GCS 中的文件以进行永久存储(我还将对消息进行其他处理)。我最初是从这个管道开始的,认为它会很简单:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply("Write to GCS", TextIO.write().to(gcs_bucket);
p.run();
}
我收到关于需要 WindowedWrites 的错误,我应用了它,然后需要 FileNamePolicy。这就是事情变得棘手的地方。
我去了 Beam 文档并查看了 FilenamePolicy。看起来我需要扩展这个类,然后还需要扩展其他抽象类来完成这项工作。不幸的是,Apache 上的文档有点少,我找不到任何 Dataflow 2.0 的示例,除了 The Wordcount Example,它甚至使用在帮助程序类中实现这些细节。
所以我可以通过复制大部分 WordCount 示例来完成这项工作,但我试图更好地理解其中的细节。我有几个问题:
1) 是否有任何路线图项目可以抽象出很多这种复杂性?似乎我应该能够像在 nonWindowedWrite 中那样提供 GCS 存储桶,然后只提供一些基本选项,如时间和文件命名规则。我知道将流式窗口数据写入文件比仅打开文件指针(或对象存储等效项)更复杂。
2) 看起来要完成这项工作,我需要创建一个 WindowedContext 对象,该对象需要提供一个 BoundedWindow 抽象类和 PaneInfo 对象类,然后是一些分片信息。这些可用的信息非常简单,我很难知道所有这些实际需要什么,特别是考虑到我的简单用例。有没有很好的例子可以实现这些?此外,看起来我还需要将# of shards 设置为 TextIO.write 的一部分,然后还提供# shards 作为 fileNamePolicy 的一部分?
感谢您帮助我了解这背后的细节,希望能学到一些东西!
编辑 7/20/17 所以我终于让这个管道通过扩展 FilenamePolicy 来运行。我的挑战是需要定义来自 PubSub 的流数据的窗口。这是代码的非常接近的表示:
public class ReadData {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Write to GCS", TextIO.write().to("gcs_bucket")
.withWindowedWrites()
.withFilenamePolicy(new TestPolicy())
.withNumShards(10));
p.run();
}
}
class TestPolicy extends FileBasedSink.FilenamePolicy {
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String filename = String.format(
"%s-%s-%s-%s-of-%s.json",
"test",
window.start().toString(),
window.end().toString(),
context.getShardNumber(),
context.getShardNumber()
);
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
【问题讨论】:
标签: java google-cloud-platform google-cloud-storage google-cloud-dataflow