【发布时间】:2019-01-15 11:11:39
【问题描述】:
从 BigQuery 读取和过滤数据,我有两种方式
从 Dataflow 中的 BigQuery 读取(使用 BigqueryIO.readTableRow.from(ValueProvider))整个数据,然后根据 max Date 等条件进行过滤
使用 NestedValueProvider 从 Dataflow 中的 BigQuery 中读取数据会慢得多。
因为如果我读取整个数据并且我的表处于附加模式会出现问题,这将增加读取数据的时间作为一天的通行证。
但如果我只读取特定日期数据,这将使我的管道读取时间一致。
但是对于 200 条记录,嵌套值提供程序比使用 BigqueryIO.readTableRow.from(ValueProvider) 读取整个数据所花费的时间要多得多。
我错过了什么有人可以帮忙吗?
我的代码片段在下面请查找。
Snippet:
PCollection<TableRow> targetTable = input.apply("Read TRUSTED_LAYER_TABLE_DESCRIPTION", BigQueryIO
.readTableRows()
.withoutValidation()
.withTemplateCompatibility()
.fromQuery(NestedValueProvider.of(options.get(Constants.TABLE_DESCRIPTION.toString())
, new QueryTranslator(options.get(Constants.ETL_BATCH_ID.toString())))).usingStandardSql());
嵌套值提供程序类片段:
public class QueryTranslator implements SerializableFunction{
/**
* Read data with max etlbatchid from query
*/
ValueProvider<String> etlbatchid;
public QueryTranslator(ValueProvider<String> etlbatchid){
this.etlbatchid = etlbatchid;
}
private static final long serialVersionUID = -2754362391392873056L;
@Override
public String apply(String input) {
String batchId = this.etlbatchid.get();
if(batchId.equals("-1"))
return String.format("SELECT * from `%s`", input);
else
return String.format("SELECT * from `%s` where etlbatchid = %s;", input,batchId);
}
}
【问题讨论】:
-
不确定你在问什么。你能试着澄清一下吗?
-
@GrahamPolley 我再次更新了可能对您有帮助的问题。
标签: google-cloud-platform google-bigquery google-cloud-dataflow apache-beam