【发布时间】:2019-07-16 08:55:25
【问题描述】:
问题陈述是我们正在自定义 Google 提供的 PubSubToBQ 数据流流式 java 模板,我们在其中配置多个订阅/主题以读取并将数据推送到多个 Bigquery 表中,这需要作为单个执行数据流管道,用于从源读取所有流并推送到 Bigquery 表中。但是当我们从 Eclipse 执行模板时,我们必须传递订阅/主题和 BQ 详细信息,以及 gcs 存储桶上的 tempalte 阶段,然后当我们使用具有不同订阅和 BQ 详细信息的 gcloud 命令运行模板时。数据流作业不会被新的订阅或 BQ 表覆盖。
目标:我的目标是使用 Google 提供的 PubSubTOBQ.java 类模板,并通过相应的 Bigquery 表传递订阅列表,并创建一个传递每个表订阅的管道。所以在一个 Job 中有 n-n, n 个管道。
我正在使用 Google 提供的 PubSubTOBQ.java 类模板,该模板将输入作为单个订阅或单个主题以及相应的大查询表详细信息。
现在我需要自定义它以将输入作为主题列表或订阅列表作为逗号分隔。我可以使用 ValueProvider> 并在 main 或 run 方法中迭代字符串数组并将订阅/主题和 bq 表作为字符串传递。查看下面的代码了解更多信息。
我在 gcp 文档上看到的是,如果我们想在 rumtime 期间覆盖或使用 value 来创建动态 Piepline,我们不能在 DoFn 之外传递 ValueProvider 变量。不确定我们是否可以在 DoFn 中读取消息。
**PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i])**
如果是,请告诉我。这样我的目的就达到了。
代码:
public static void main(String[] args) {
StreamingDataflowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StreamingDataflowOptions.class);
List<String> listOfSubStr = new ArrayList<String>();
List<String> listOfTopicStr = new ArrayList<String>();
List<String> listOfTableStr = new ArrayList<String>();
String[] providedSubscriptionArray = null;
String[] providedTopicArray = null;
String[] providedTableArray = null;
if (options.getInputSubscription().isAccessible()) {
listOfSubStr = options.getInputSubscription().get();
providedSubscriptionArray = new String[listOfSubStr.size()];
providedSubscriptionArray = createListOfProvidedStringArray(listOfSubStr);
}
if (options.getInputTopic().isAccessible()) {
listOfTopicStr = options.getInputTopic().get();
providedTopicArray = new String[listOfSubStr.size()];
providedTopicArray = createListOfProvidedStringArray(listOfTopicStr);
}
if (options.getOutputTableSpec().isAccessible()) {
listOfTableStr = options.getOutputTableSpec().get();
providedTableArray = new String[listOfSubStr.size()];
providedTableArray = createListOfProvidedStringArray(listOfTableStr);
}
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> readPubSubMessage = null;
for (int i = 0; i < providedSubscriptionArray.length; i++) {
if (options.getUseSubscription()) {
readPubSubMessage = pipeline
.apply(PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i]));
} else {
readPubSubMessage = pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(providedTopicArray[i]));
}
readPubSubMessage
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("Convert Message To TableRow", ParDo.of(new PubsubMessageToTableRow()))
.apply("Insert Data To BigQuery",
BigQueryIO.writeTableRows().to(providedTableArray[i])
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
}
pipeline.run().waitUntilFinish();
}
应该能够将单个 Dataflow PubSubTOBQ 模板用于单个 Dataflow Streaming Job 中与 bigquery 模板数量相对应的订阅数量的多个管道。
【问题讨论】:
标签: java for-loop google-cloud-platform google-cloud-dataflow google-cloud-pubsub