【发布时间】:2023-04-04 01:52:01
【问题描述】:
目标
我的目标是创建一个指定 Apache Beam 管道的 Dataflow 模板。管道以批处理模式运行,从 BigQuery 读取,然后在其他地方执行转换和写入。最重要的是,我用于读取 BigQuery 的查询必须是运行时提供的。
预期行为
预期结果是管道将使用运行时参数指定 BigQuery 查询、执行查询,然后继续处理管道的其余部分。
实际行为
实际行为是忽略我传入的运行时参数,而是使用我在创建 GCS 模板时必须指定的参数。
相关代码
下面是我如何指定读操作,以及查询参数是如何定义和传入的。
public interface MyOptions extends PipelineOptions, StreamingOptions {
@Description("Query String")
ValueProvider<String> getQueryString();
void setQueryString(ValueProvider<String> value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> tableRows =
p.apply(BigQueryIO.readTableRows()
.fromQuery(options.getQueryString())
.withTemplateCompatibility()
.withoutValidation());
// Add this point I run my transformations and loading
}
要实际构建模板并推送到 GCS,我执行以下操作
mvn compile -Pdataflow-runner exec:java -Dexec.mainClass=com.Pipeline "-Dexec.args=--runner=DataflowRunner --queryString='SELECT time,type FROM [my-project:timeseries.my-data] where time between TIMESTAMP(\"2020-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")'"
最后,我使用 Dataflow Web UI 从 GCS 中选择模板并进行部署。在 Web UI 的底部,我指定了我的运行时参数,我在其中设置了 queryString 和我想要使用的运行时查询。
注意:当我在 Dataflow 中运行模板时,我指定了 queryString 并且我知道它正在被传入。我将我的第一个转换重写为打印出 queryString 并正确打印指定的运行时选项。问题是“从 BigQuery 读取”queryString 仍然是我制作模板时使用的原始字符串。
【问题讨论】:
标签: java google-cloud-platform google-bigquery apache-beam