【问题标题】:Write multiple tables to bigquery by filtering通过过滤将多个表写入bigquery
【发布时间】:2021-09-30 03:31:55
【问题描述】:

我正在从 jdbc 连接读取数据,转换为 tablerows,然后使用以下代码将数据上传到 bigquery。

我期望的数据有多个列,其中之一称为“电话号码”。如果我想在另一个表中存储此电话号码长度小于 8 位的行,请将它们删除,然后将两个表(一个具有无效电话号码,另一个具有具有正确电话号码的剩余数据)写入 bigquery。 读取jdbc数据后我可以使用什么方法来做到这一点?

  private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {

    
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Filter data
     *        3) Append TableRow to BigQuery via BigQueryIO
     */
    pipeline
        /*
         * Step 1: Read records via JDBC and convert to TableRow
         *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
         */
        .apply(
            "Read from JdbcIO",
            DynamicJdbcIO.<TableRow>read()
                .withDataSourceConfiguration(
                    DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                            options.getDriverClassName(),
                            maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                        .withUsername(
                            maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                        .withPassword(
                            maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                        .withDriverJars(options.getDriverJars())
                        .withConnectionProperties(options.getConnectionProperties()))
                .withQuery(options.getQuery())
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow()))

        .apply(
            "Write to BigQuery",
            BigQueryIO.writeTableRows()
                .withoutValidation()
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .to(options.getOutputTable())
                

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

【问题讨论】:

    标签: google-bigquery transform pipeline apache-beam tablerow


    【解决方案1】:

    【讨论】:

      猜你喜欢
      • 2021-12-15
      • 2017-12-15
      • 1970-01-01
      • 1970-01-01
      • 2022-01-22
      • 2016-11-02
      • 2019-06-19
      • 1970-01-01
      相关资源
      最近更新 更多