【问题标题】:GCP Dataflow 2.0 PubSub to GCSGCP 数据流 2.0 PubSub 到 GCS
【发布时间】:2017-12-23 17:28:35
【问题描述】:

我很难理解 TextIO.write() 的 .withFileNamePolicy 的概念。提供 FileNamePolicy 的要求对于做一些简单的事情来说似乎非常复杂,比如指定一个 GCS 存储桶来写入流文件。

概括地说,我将 JSON 消息流式传输到 PubSub 主题,我想将这些原始消息写入 GCS 中的文件以进行永久存储(我还将对消息进行其他处理)。我最初是从这个管道开始的,认为它会很简单:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options); 

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply("Write to GCS", TextIO.write().to(gcs_bucket);

        p.run();

    }

我收到关于需要 WindowedWrites 的错误,我应用了它,然后需要 FileNamePolicy。这就是事情变得棘手的地方。

我去了 Beam 文档并查看了 FilenamePolicy。看起来我需要扩展这个类,然后还需要扩展其他抽象类来完成这项工作。不幸的是,Apache 上的文档有点少,我找不到任何 Dataflow 2.0 的示例,除了 The Wordcount Example,它甚至使用在帮助程序类中实现这些细节。

所以我可以通过复制大部分 WordCount 示例来完成这项工作,但我试图更好地理解其中的细节。我有几个问题:

1) 是否有任何路线图项目可以抽象出很多这种复杂性?似乎我应该能够像在 nonWindowedWrite 中那样提供 GCS 存储桶,然后只提供一些基本选项,如时间和文件命名规则。我知道将流式窗口数据写入文件比仅打开文件指针(或对象存储等效项)更复杂。

2) 看起来要完成这项工作,我需要创建一个 WindowedContext 对象,该对象需要提供一个 BoundedWindow 抽象类和 PaneInfo 对象类,然后是一些分片信息。这些可用的信息非常简单,我很难知道所有这些实际需要什么,特别是考虑到我的简单用例。有没有很好的例子可以实现这些?此外,看起来我还需要将# of shards 设置为 TextIO.write 的一部分,然后还提供# shards 作为 fileNamePolicy 的一部分?

感谢您帮助我了解这背后的细节,希望能学到一些东西!

编辑 7/20/17 所以我终于让这个管道通过扩展 FilenamePolicy 来运行。我的挑战是需要定义来自 PubSub 的流数据的窗口。这是代码的非常接近的表示:

public class ReadData {
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Write to GCS", TextIO.write().to("gcs_bucket")
                .withWindowedWrites()
                .withFilenamePolicy(new TestPolicy())
                .withNumShards(10));

        p.run();

    }
}

class TestPolicy extends FileBasedSink.FilenamePolicy {
    @Override
    public ResourceId windowedFilename(
        ResourceId outputDirectory, WindowedContext context, String extension) {
        IntervalWindow window = (IntervalWindow) context.getWindow();
        String filename = String.format(
            "%s-%s-%s-%s-of-%s.json",
            "test",
            window.start().toString(),
            window.end().toString(),
            context.getShardNumber(),
            context.getShardNumber()
        );
        return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        ResourceId outputDirectory, Context context, String extension) {
        throw new UnsupportedOperationException("Unsupported.");
    }
}

【问题讨论】:

    标签: java google-cloud-platform google-cloud-storage google-cloud-dataflow


    【解决方案1】:

    在 Beam 2.0 中,以下是将来自 PubSub 的原始消息写入 GCS 上的窗口文件的示例。该管道是相当可配置的,如果您想要数据的逻辑子部分以便于重新处理/存档,则允许您通过参数和子目录策略指定窗口持续时间。请注意,这对 Apache Commons Lang 3 有额外的依赖关系。

    PubSubToGcs

    /**
     * This pipeline ingests incoming data from a Cloud Pub/Sub topic and
     * outputs the raw data into windowed files at the specified output
     * directory.
     */
    public class PubsubToGcs {
    
      /**
       * Options supported by the pipeline.
       * 
       * <p>Inherits standard configuration options.</p>
       */
      public static interface Options extends DataflowPipelineOptions, StreamingOptions {
        @Description("The Cloud Pub/Sub topic to read from.")
        @Required
        ValueProvider<String> getTopic();
        void setTopic(ValueProvider<String> value);
    
        @Description("The directory to output files to. Must end with a slash.")
        @Required
        ValueProvider<String> getOutputDirectory();
        void setOutputDirectory(ValueProvider<String> value);
    
        @Description("The filename prefix of the files to write to.")
        @Default.String("output")
        @Required
        ValueProvider<String> getOutputFilenamePrefix();
        void setOutputFilenamePrefix(ValueProvider<String> value);
    
        @Description("The shard template of the output file. Specified as repeating sequences "
            + "of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the "
            + "shard number, or number of shards respectively")
        @Default.String("")
        ValueProvider<String> getShardTemplate();
        void setShardTemplate(ValueProvider<String> value);
    
        @Description("The suffix of the files to write.")
        @Default.String("")
        ValueProvider<String> getOutputFilenameSuffix();
        void setOutputFilenameSuffix(ValueProvider<String> value);
    
        @Description("The sub-directory policy which files will use when output per window.")
        @Default.Enum("NONE")
        SubDirectoryPolicy getSubDirectoryPolicy();
        void setSubDirectoryPolicy(SubDirectoryPolicy value);
    
        @Description("The window duration in which data will be written. Defaults to 5m. "
            + "Allowed formats are: "
            + "Ns (for seconds, example: 5s), "
            + "Nm (for minutes, example: 12m), "
            + "Nh (for hours, example: 2h).")
        @Default.String("5m")
        String getWindowDuration();
        void setWindowDuration(String value);
    
        @Description("The maximum number of output shards produced when writing.")
        @Default.Integer(10)
        Integer getNumShards();
        void setNumShards(Integer value);
      }
    
      /**
       * Main entry point for executing the pipeline.
       * @param args  The command-line arguments to the pipeline.
       */
      public static void main(String[] args) {
    
        Options options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(Options.class);
    
        run(options);
      }
    
      /**
       * Runs the pipeline with the supplied options.
       * 
       * @param options The execution parameters to the pipeline.
       * @return  The result of the pipeline execution.
       */
      public static PipelineResult run(Options options) {
        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);
    
        /**
         * Steps:
         *   1) Read string messages from PubSub
         *   2) Window the messages into minute intervals specified by the executor.
         *   3) Output the windowed files to GCS
         */
        pipeline
          .apply("Read PubSub Events",
            PubsubIO
              .readStrings()
              .fromTopic(options.getTopic()))
          .apply(options.getWindowDuration() + " Window", 
              Window
                .into(FixedWindows.of(parseDuration(options.getWindowDuration()))))
          .apply("Write File(s)",
              TextIO
                .write()
                .withWindowedWrites()
                .withNumShards(options.getNumShards())
                .to(options.getOutputDirectory())
                .withFilenamePolicy(
                    new WindowedFilenamePolicy(
                        options.getOutputFilenamePrefix(),
                        options.getShardTemplate(),
                        options.getOutputFilenameSuffix())
                    .withSubDirectoryPolicy(options.getSubDirectoryPolicy())));
    
        // Execute the pipeline and return the result.
        PipelineResult result = pipeline.run();
    
        return result;
      }
    
      /**
       * Parses a duration from a period formatted string. Values
       * are accepted in the following formats:
       * <p>
       * Ns - Seconds. Example: 5s<br>
       * Nm - Minutes. Example: 13m<br>
       * Nh - Hours. Example: 2h
       * 
       * <pre>
       * parseDuration(null) = NullPointerException()
       * parseDuration("")   = Duration.standardSeconds(0)
       * parseDuration("2s") = Duration.standardSeconds(2)
       * parseDuration("5m") = Duration.standardMinutes(5)
       * parseDuration("3h") = Duration.standardHours(3)
       * </pre>
       * 
       * @param value The period value to parse.
       * @return  The {@link Duration} parsed from the supplied period string.
       */
      private static Duration parseDuration(String value) {
        Preconditions.checkNotNull(value, "The specified duration must be a non-null value!");
    
        PeriodParser parser = new PeriodFormatterBuilder()
          .appendSeconds().appendSuffix("s")
          .appendMinutes().appendSuffix("m")
          .appendHours().appendSuffix("h")
          .toParser();
    
        MutablePeriod period = new MutablePeriod();
        parser.parseInto(period, value, 0, Locale.getDefault());
    
        Duration duration = period.toDurationFrom(new DateTime(0));
        return duration;
      }
    }
    


    WindowedFilenamePolicy

    /**
     * The {@link WindowedFilenamePolicy} class will output files
     * to the specified location with a format of output-yyyyMMdd'T'HHmmssZ-001-of-100.txt.
     */
    @SuppressWarnings("serial")
    public class WindowedFilenamePolicy extends FilenamePolicy {
    
        /**
         * Possible sub-directory creation modes.
         */
        public static enum SubDirectoryPolicy {
            NONE("."),
            PER_HOUR("yyyy-MM-dd/HH"),
            PER_DAY("yyyy-MM-dd");
    
            private final String subDirectoryPattern;
    
            private SubDirectoryPolicy(String subDirectoryPattern) {
                this.subDirectoryPattern = subDirectoryPattern;
            }
    
            public String getSubDirectoryPattern() {
                return subDirectoryPattern;
            }
    
            public String format(Instant instant) {
                DateTimeFormatter formatter = DateTimeFormat.forPattern(subDirectoryPattern);
                return formatter.print(instant);
            }
        }
    
        /**
         * The formatter used to format the window timestamp for outputting to the filename.
         */
        private static final DateTimeFormatter formatter = ISODateTimeFormat
                .basicDateTimeNoMillis()
                .withZone(DateTimeZone.getDefault());
    
        /**
         * The filename prefix.
         */
        private final ValueProvider<String> prefix;
    
        /**
         * The filenmae suffix.
         */
        private final ValueProvider<String> suffix;
    
        /**
         * The shard template used during file formatting.
         */
        private final ValueProvider<String> shardTemplate;
    
        /**
         * The policy which dictates when or if sub-directories are created
         * for the windowed file output.
         */
        private ValueProvider<SubDirectoryPolicy> subDirectoryPolicy = StaticValueProvider.of(SubDirectoryPolicy.NONE);
    
        /**
         * Constructs a new {@link WindowedFilenamePolicy} with the
         * supplied prefix used for output files.
         * 
         * @param prefix    The prefix to append to all files output by the policy.
         * @param shardTemplate The template used to create uniquely named sharded files.
         * @param suffix    The suffix to append to all files output by the policy.
         */
        public WindowedFilenamePolicy(String prefix, String shardTemplate, String suffix) {
            this(StaticValueProvider.of(prefix), 
                    StaticValueProvider.of(shardTemplate),
                    StaticValueProvider.of(suffix));
        }
    
        /**
         * Constructs a new {@link WindowedFilenamePolicy} with the
         * supplied prefix used for output files.
         * 
         * @param prefix    The prefix to append to all files output by the policy.
         * @param shardTemplate The template used to create uniquely named sharded files.
         * @param suffix    The suffix to append to all files output by the policy.
         */
        public WindowedFilenamePolicy(
                ValueProvider<String> prefix, 
                ValueProvider<String> shardTemplate, 
                ValueProvider<String> suffix) {
            this.prefix = prefix;
            this.shardTemplate = shardTemplate;
            this.suffix = suffix; 
        }
    
        /**
         * The subdirectory policy will create sub-directories on the
         * filesystem based on the window which has fired.
         * 
         * @param policy    The subdirectory policy to apply.
         * @return The filename policy instance.
         */
        public WindowedFilenamePolicy withSubDirectoryPolicy(SubDirectoryPolicy policy) {
            return withSubDirectoryPolicy(StaticValueProvider.of(policy));
        }
    
        /**
         * The subdirectory policy will create sub-directories on the
         * filesystem based on the window which has fired.
         * 
         * @param policy    The subdirectory policy to apply.
         * @return The filename policy instance.
         */
        public WindowedFilenamePolicy withSubDirectoryPolicy(ValueProvider<SubDirectoryPolicy> policy) {
            this.subDirectoryPolicy = policy;
            return this;
        }
    
        /**
         * The windowed filename method will construct filenames per window in the
         * format of output-yyyyMMdd'T'HHmmss-001-of-100.txt.
         */
        @Override
        public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext c, String extension) {
            Instant windowInstant = c.getWindow().maxTimestamp();
            String datetimeStr = formatter.print(windowInstant.toDateTime());
    
            // Remove the prefix when it is null so we don't append the literal 'null'
            // to the start of the filename
            String filenamePrefix = prefix.get() == null ? datetimeStr : prefix.get() + "-" + datetimeStr;
            String filename = DefaultFilenamePolicy.constructName(
                    filenamePrefix, 
                    shardTemplate.get(), 
                    StringUtils.defaultIfBlank(suffix.get(), extension),  // Ignore the extension in favor of the suffix.
                    c.getShardNumber(), 
                    c.getNumShards());
    
            String subDirectory = subDirectoryPolicy.get().format(windowInstant);
            return outputDirectory
                    .resolve(subDirectory, StandardResolveOptions.RESOLVE_DIRECTORY)
                    .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
        }
    
        /**
         * Unwindowed writes are unsupported by this filename policy so an {@link UnsupportedOperationException}
         * will be thrown if invoked.
         */
        @Override
        public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) {
        throw new UnsupportedOperationException("There is no windowed filename policy for unwindowed file"
            + " output. Please use the WindowedFilenamePolicy with windowed writes or switch filename policies.");
        }
    }
    

    【讨论】:

    • 在提交了有关临时文件的bug 之后,Beam 2.2 中的代码发生了一些变化。 code 现在使用的一个很好的例子。
    【解决方案2】:

    目前在 Beam 中 DefaultFilenamePolicy 支持窗口写入,因此无需编写自定义 FilenamePolicy。您可以通过在文件名模板中放置 W 和 P 占位符(分别用于窗口和窗格)来控制输出文件名。这存在于 head beam 存储库中,并且也将在即将发布的 Beam 2.1 版本中(正如我们所说的那样发布)。

    【讨论】:

    • 嗨鲁文,谢谢你的信息。我想我之前尝试过这个并且没有运气,当我查看文档时它说它仅用于未窗口化。我也在使用 Google Cloud Dataflow,并且一直在使用 2.0 版本。听起来这是即将到来的? beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/…
    • 这将在 Beam 2.1 中发布。发布后,将发布包含此功能的 Google Cloud Dataflow 2.1。您现在可以通过直接从最新的 Beam 快照构建来试用它。
    • 另一个注意事项:编写自定义 FilenamePolicy 并不难。您只需覆盖 windowedFilename 方法,并将输入参数映射到文件名。但请注意,此 API 在即将发布的版本中会略有变化。
    • 再次感谢鲁万。我尝试编写自定义 FilenamePolicy,但我无法理解如何覆盖 windowedFilename 方法。看起来它需要接受 ResourceID 和 Context(除了字符串),而且 Javadocs 在这些方面也非常稀缺。我遇到的挑战是所有这些都是抽象类,而这些抽象类反过来又需要其他所有 Beam 特定的抽象类。我不清楚我实际上在哪里提供所有这些的 GCP 存储桶和文件名格式。
    • 正确。写入 GCS 需要写入整个文件,因此需要某种分组方式。请注意,如果您不需要窗口的语义正确性,您也可以为此简单地使用处理时间或基于计数的触发器。例如,如果您只想让 10 个分片中的每一个分片每 10,000 个元素写入一个文件,您可以改为执行 Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(10000)));跨度>
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-06-28
    • 1970-01-01
    • 2020-04-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-26
    相关资源
    最近更新 更多