【发布时间】:2021-09-30 03:31:55
【问题描述】:
我正在从 jdbc 连接读取数据,转换为 tablerows,然后使用以下代码将数据上传到 bigquery。
我期望的数据有多个列,其中之一称为“电话号码”。如果我想在另一个表中存储此电话号码长度小于 8 位的行,请将它们删除,然后将两个表(一个具有无效电话号码,另一个具有具有正确电话号码的剩余数据)写入 bigquery。 读取jdbc数据后我可以使用什么方法来做到这一点?
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
* 2) Filter data
* 3) Append TableRow to BigQuery via BigQueryIO
*/
pipeline
/*
* Step 1: Read records via JDBC and convert to TableRow
* via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
*/
.apply(
"Read from JdbcIO",
DynamicJdbcIO.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIO.DynamicDataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery(options.getQuery())
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable())
// Execute the pipeline and return the result.
return pipeline.run();
}
}
【问题讨论】:
标签: google-bigquery transform pipeline apache-beam tablerow