【问题标题】:For google cloud dataflow, is it possible to start another pipeline from a pipeline.对于谷歌云数据流,是否可以从管道启动另一个管道。
【发布时间】:2017-05-23 06:40:46
【问题描述】:

我正在尝试设置一个谷歌云数据流管道(流模式),它读取 pubsub 主题消息,从发布的消息中提取信息(谷歌云存储中的对象名称),然后启动另一个管道(批处理模式)来处理存储在谷歌云存储。

是否可以在管道中启动另一个管道???

【问题讨论】:

  • 您能否编辑您的问题以详细说明您的用例?或许可以通过一条管道来实现,这样管理起来会简单得多。
  • 这是我的用例。我每小时生成一次日志并存储在云存储中。我为该存储桶配置了对象更改通知,并将通知发布到我在 GAE 上开发的应用程序。 GAE 应用程序收到对象更改通知 POST 后,我提取了新生成的日志名称,并使用云 pub/sub 发布到主题。然后我的数据流程序使用 PUBSUBIO 从主题中读取(以流模式)消息并提取日志名称和存储桶信息。然后,我希望启动另一个管道来批处理指定的日志。
  • 如果您启动另一个管道的唯一原因是在到达的日志文件名上应用 TextIO.Read.from(),那么我建议更改此设置并使用简单的手动 ParDo 读取日志。例如。见stackoverflow.com/questions/32277968/…
  • 感谢您的回复。

标签: google-cloud-dataflow google-cloud-pubsub


【解决方案1】:

没有技术原因禁止这样做。您需要确保将 Pipeline 对象分开,并有足够的 Compute Engine 配额来启动您需要的所有作业。

【讨论】:

    【解决方案2】:

    我们让它工作了。这样做:

    private static class ExecuteUpdateTaskFroNamespace extends DoFn<String, String> {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String namespace = c.element();
            LOG.info("Processing namespace: " + namespace);
    
            BasicOptions options = c.getPipelineOptions().cloneAs(BasicOptions.class);
    
            EntityOptions entityOptions = PipelineOptionsFactory.as(EntityOptions.class); // important to NOT use .create()
            entityOptions.setNamespace(namespace);
            entityOptions.setProject(options.getProject());
            entityOptions.setRunner(DataflowPipelineRunner.class);
            entityOptions.setStagingLocation(options.getStagingLocation());
            entityOptions.setKind("DocsAsset");
            try {
                Pipeline p = Pipeline.create(entityOptions);
                p.apply("Read from Datastore", BcDatastoreReadFactory.getEntitySource(entityOptions))
                        .apply("Find Old Site Entities", ParDo.of(new FindEntities()))
                        .apply("Transform Entities", ParDo.of(new TransformEntities()))
                        .apply("Save", DatastoreIO.v1().write().withProjectId(entityOptions.getProject()));
                p.run();
    
                LOG.info("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
                c.output("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
    
            } catch (Exception e) {
                LOG.warn("Unable to create pipeline for namespace: " + namespace, e);
            }
    
        }
    }
    

    问题:你不能一次生成超过 25 个而不达到配额,要绕过这个,你可以将 setRunner(DataflowPipelineRunner.class) 更改为 setRunner(BlockingDataflowPipelineRunner.class)。但是 BlockingDataflowPipelineRunner 在 2.0.0 中被删除

    EntityOptions 和 BasicOptions 是 PipelineOptions 的扩展。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-01-14
      • 2021-03-15
      • 2022-11-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多