【问题标题】:BigqueryIO schema from table row表行中的 BigqueryIO 架构
【发布时间】:2020-12-18 14:43:43
【问题描述】:

我有一个 TableRow 的 PCollection,其表行中的表名和架构如下所示

我需要在提到的行中将 col1、col2 和 col3 值插入到带有 Schema 的表中 BigQueryIO.writeTableRows()

我们可以使用 lambda 从行中访问表名,如下所示 BigQueryIO.writeTableRows().to(tablerow->tablerow.get("Table").toString()) 但是我们如何使用 BigQueryIO.writeTableRows().withSchema() 从表行访问模式

请帮忙

【问题讨论】:

    标签: google-bigquery google-cloud-dataflow


    【解决方案1】:

    您需要使用DynamicDestination 为每个元素选择要应用的表名称和架构。

    分解文档的代码示例

    events.apply(BigQueryIO.<UserEvent>write()
      .to(new DynamicDestinations<UserEvent, String>() {
    // Here you extract the "key" value for the selection of schema and table. 
    // If you need the values of the 2 first column, you can create your own structure
    // Example: <schema>|<table>
            public String getDestination(ValueInSingleWindow<UserEvent> element) {
              return element.getValue().getUserId();
            }
    // Here return the Table destination (dataset and table) according to the "Key"
    // If you have the value in your key, you can do key.split("|")[1], according to my previous example
            public TableDestination getTable(String user) {
              return new TableDestination(tableForUser(user), "Table for user " + user);
            }
    //Same for the schema here.
            public TableSchema getSchema(String user) {
              return tableSchemaForUser(user);
            }
          })
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-02-15
      • 2022-01-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-22
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多