【问题标题】:GCP Dataflow Streaming Template : Not able to customize google provided java based PubSubToBQ templateGCP 数据流流模板:无法自定义谷歌提供的基于 java 的 PubSubToBQ 模板
【发布时间】:2019-07-16 08:55:25
【问题描述】:

问题陈述是我们正在自定义 Google 提供的 PubSubToBQ 数据流流式 java 模板,我们在其中配置多个订阅/主题以读取并将数据推送到多个 Bigquery 表中,这需要作为单个执行数据流管道,用于从源读取所有流并推送到 Bigquery 表中。但是当我们从 Eclipse 执行模板时,我们必须传递订阅/主题和 BQ 详细信息,以及 gcs 存储桶上的 tempalte 阶段,然后当我们使用具有不同订阅和 BQ 详细信息的 gcloud 命令运行模板时。数据流作业不会被新的订阅或 BQ 表覆盖。

目标:我的目标是使用 Google 提供的 PubSubTOBQ.java 类模板,并通过相应的 Bigquery 表传递订阅列表,并创建一个传递每个表订阅的管道。所以在一个 Job 中有 n-n, n 个管道。

我正在使用 Google 提供的 PubSubTOBQ.java 类模板,该模板将输入作为单个订阅或单个主题以及相应的大查询表详细信息。

现在我需要自定义它以将输入作为主题列表或订阅列表作为逗号分隔。我可以使用 ValueProvider> 并在 main 或 run 方法中迭代字符串数组并将订阅/主题和 bq 表作为字符串传递。查看下面的代码了解更多信息。

我在 gcp 文档上看到的是,如果我们想在 rumtime 期间覆盖或使用 value 来创建动态 Piepline,我们不能在 DoFn 之外传递 ValueProvider 变量。不确定我们是否可以在 DoFn 中读取消息。

**PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i])** 

如果是,请告诉我。这样我的目的就达到了。

代码:

public static void main(String[] args) {
        StreamingDataflowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(StreamingDataflowOptions.class);

    List<String> listOfSubStr = new ArrayList<String>();
    List<String> listOfTopicStr = new ArrayList<String>();
    List<String> listOfTableStr = new ArrayList<String>();

    String[] providedSubscriptionArray = null;
    String[] providedTopicArray = null;
    String[] providedTableArray = null;

    if (options.getInputSubscription().isAccessible()) {
        listOfSubStr = options.getInputSubscription().get();
        providedSubscriptionArray = new String[listOfSubStr.size()];
        providedSubscriptionArray = createListOfProvidedStringArray(listOfSubStr);
    }

    if (options.getInputTopic().isAccessible()) {
        listOfTopicStr = options.getInputTopic().get();
        providedTopicArray = new String[listOfSubStr.size()];
        providedTopicArray = createListOfProvidedStringArray(listOfTopicStr);
    }

    if (options.getOutputTableSpec().isAccessible()) {
        listOfTableStr = options.getOutputTableSpec().get();
        providedTableArray = new String[listOfSubStr.size()];
        providedTableArray = createListOfProvidedStringArray(listOfTableStr);
    }

    Pipeline pipeline = Pipeline.create(options);

    PCollection<PubsubMessage> readPubSubMessage = null;

    for (int i = 0; i < providedSubscriptionArray.length; i++) {

        if (options.getUseSubscription()) {
            readPubSubMessage = pipeline
                    .apply(PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i]));
        } else {
            readPubSubMessage = pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(providedTopicArray[i]));
        }

        readPubSubMessage
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("Convert Message To TableRow", ParDo.of(new PubsubMessageToTableRow()))
                .apply("Insert Data To BigQuery",
                        BigQueryIO.writeTableRows().to(providedTableArray[i])
                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    }

    pipeline.run().waitUntilFinish();
}

应该能够将单个 Dataflow PubSubTOBQ 模板用于单个 Dataflow Streaming Job 中与 bigquery 模板数量相对应的订阅数量的多个管道。

【问题讨论】:

    标签: java for-loop google-cloud-platform google-cloud-dataflow google-cloud-pubsub


    【解决方案1】:

    问题在于,截至目前,Dataflow 模板需要在暂存/创建时了解管道图,因此它在运行时不会有所不同。如果您仍想使用非模板化管道并将逗号分隔的 Pub/Sub 主题列表作为 --topicList 选项参数传递,那么您可以执行以下操作:

    String[] listOfTopicStr = options.getTopicList().split(",");
    
    PCollection[] p = new PCollection[listOfTopicStr.length];
    
    for (int i = 0; i < listOfTopicStr.length; i++) {
        p[i] = pipeline
            .apply(PubsubIO.readStrings().fromTopic(listOfTopicStr[i]))
            .apply(ParDo.of(new DoFn<String, Void>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    Log.info(String.format("Message=%s", c.element()));
                }
            }));
    }
    

    完整代码here.

    如果我们用 3 个主题对其进行测试,例如:

    mvn -Pdataflow-runner compile -e exec:java \
     -Dexec.mainClass=com.dataflow.samples.MultipleTopics \
          -Dexec.args="--project=$PROJECT \
          --topicList=projects/$PROJECT/topics/topic1,projects/$PROJECT/topics/topic2,projects/$PROJECT/topics/topic3 \
          --stagingLocation=gs://$BUCKET/staging/ \
          --runner=DataflowRunner"
    
    gcloud pubsub topics publish topic1 --message="message 1"
    gcloud pubsub topics publish topic2 --message="message 2"
    gcloud pubsub topics publish topic3 --message="message 3"
    

    输出和数据流图将如预期的那样:

    将这种方法强制用于模板的一种可能解决方法是在最坏的情况下拥有足够多的主题N。当我们使用n 主题(满足n &lt;= N)执行模板时,我们需要指定N - n 未使用/虚拟主题来填写。

    【讨论】:

    • 感谢 Guillem 的回复!您已发布传递逗号分隔主题或 bq 列表的方法,我也在以相同的方式做 & 正如您所说的“数据流模板,截至目前,需要在暂存/创建时知道管道图,所以它不能在运行时不同。”所以我无法实现在管道执行运行时传递主题列表的目标?您建议的一种解决方法是使用 N 主题部署模板并启动具有 n 主题的作业。不确定,需要和我的团队讨论,我们是否可以使用这个。如果你有什么,请给我更多建议。
    • 如果没有更改模板的工作方式或从多个主题/订阅中读取的自定义/新 PubSubIO 源,我不这么认为。其他解决方法可以是为每个可能的输入数量使用一个模板,但是将它们维护或合并到一个主题中然后使用 DynamicDestinations 相应地路由到输出表会很痛苦......
    猜你喜欢
    • 2021-01-13
    • 2021-04-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多