【问题标题】:Dataflow job to write into BigQuery with schema autodetect使用架构自动检测写入 BigQuery 的数据流作业
【发布时间】:2020-03-06 16:57:13
【问题描述】:

目前,我们正在寻找将原始数据转换为通用结构以供进一步分析的最佳方法。我们的数据是 JSON 文件,有些文件有更多的字段,有的更少,有些可能有数组,但总的来说它的结构几乎相同。

为此,我正在尝试用 Java 构建 Apache Beam 管道。我所有的管道都基于这个模板:TextIOToBigQuery.java

第一种方法是将整个 JSON 作为字符串加载到一个列中,然后使用JSON Functions in Standard SQL 转换为通用结构。这在这里有很好的描述:How to manage/handle schema changes while loading JSON file into BigQuery table

第二种方法是将数据加载到适当的列中。所以现在可以通过标准 SQL 查询数据。它还需要知道架构。可以通过控制台、UI 和其他方式检测到它:Using schema auto-detection,但是我没有找到任何关于如何通过 Java 和 Apache Beam 管道实现这一点的信息。

我分析了BigQueryIO,看起来没有架构就无法工作(有一个例外,如果表已经创建)

正如我之前提到的,新文件可能会带来新字段,因此应该相应地更新架构。

假设我有三个 JSON 文件:

1. { "field1": "value1" }
2. { "field2": "value2" }
3. { "field1": "value3", "field10": "value10" }

第一个创建新表,其中包含一个字符串类型的字段“field1”。 所以我的表格应该是这样的:

|field1  |
----------
|"value1"|

Second 做同样的事情,但添加新字段“field2”。现在我的表格应该是这样的:

|field1  |field2  |
-------------------
|"value1"|null    |
-------------------
|null    |"value2"|

第三个 JSON 应该将另一个字段“field10”添加到架构中,依此类推。真正的 JSON 文件可能有 200 个或更多字段。处理这种情况会有多难?

哪种方式更好地进行这种转换?

【问题讨论】:

  • 这是打算作为 TextIO 的批处理过程,还是您将来也希望以流模式执行此操作。
  • @RezaRokni 我不确定,流式插入 BigQuery 不是免费的。所以目前它将是来自 FileIO 的批处理。
  • 请注意,您还可以通过选择FILE_LOADS 方法并设置.withTriggeringFrequency (docs) 来在流式传输管道中使用加载作业

标签: java google-bigquery google-cloud-dataflow database-schema apache-beam


【解决方案1】:

我做了一些测试来模拟典型的自动检测模式:首先我运行所有数据以构建所有可能字段和类型的Map(这里我只考虑了StringInteger简单)。我使用stateful 管道来跟踪已经看到的字段并将其保存为PCollectionView。这样我就可以使用.withSchemaFromView(),因为架构在管道构造中是未知的。请注意,此方法仅对批处理作业有效。

首先,我创建了一些没有严格模式的虚拟数据,其中每一行可能包含也可能不包含任何字段:

PCollection<KV<Integer, String>> input = p
  .apply("Create data", Create.of(
        KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
        KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
        KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
        KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
  );

我们将读取输入数据并构建我们在数据中看到的不同字段名称的Map,并进行基本类型检查以确定它是否包含INTEGERSTRING。当然,如果需要,这可以扩展。请注意,之前创建的所有数据都分配给同一个键,以便将它们组合在一起,我们可以构建完整的字段列表,但这可能是性能瓶颈。我们将输出具体化,以便我们可以将其用作侧输入:

PCollectionView<Map<String, String>> schemaSideInput = input  
  .apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {

    // A map containing field-type pairs
    @StateId("schema")
    private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
        StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

    @ProcessElement
    public void processElement(ProcessContext c,
                               @StateId("schema") ValueState<Map<String, String>> schemaSpec) {
      JSONObject message = new JSONObject(c.element().getValue());
      Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());

      // iterate through fields
      message.keySet().forEach(key ->
      {
          Object value = message.get(key);

          if (!current.containsKey(key)) {
              String type = "STRING";

              try {
                  Integer.parseInt(value.toString());
                  type = "INTEGER";
              }
              catch(Exception e) {}

              // uncomment if debugging is needed
              // LOG.info("key: "+ key + " value: " + value + " type: " + type);

              c.output(KV.of(key, type));
              current.put(key, type); 
              schemaSpec.write(current);
          }
      });
    }
  })).apply("Save as Map", View.asMap());

现在我们可以使用之前的 Map 来构建包含 BigQuery 表架构的 PCollectionView

PCollectionView<Map<String, String>> schemaView = p
  .apply("Start", Create.of("Start"))
  .apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Map<String, String> schemaFields = c.sideInput(schemaSideInput);  
        List<TableFieldSchema> fields = new ArrayList<>();  

        for (Map.Entry<String, String> field : schemaFields.entrySet()) 
        { 
            fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
            // LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
        }

        TableSchema schema = new TableSchema().setFields(fields);

        String jsonSchema;
        try {
            jsonSchema = Transport.getJsonFactory().toString(schema);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));

      }}).withSideInputs(schemaSideInput))
  .apply("Save as Singleton", View.asSingleton());

相应地更改完全限定的表名PROJECT_ID:DATASET_NAME.dynamic_bq_schema

最后,在我们的管道中,我们读取数据,将其转换为 TableRow,然后使用 .withSchemaFromView(schemaView) 将其写入 BigQuery:

input
  .apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
          JSONObject message = new JSONObject(c.element().getValue());
          TableRow row = new TableRow();

          message.keySet().forEach(key ->
          {
              Object value = message.get(key);
              row.set(key, value);
          });

        c.output(row);
      }}))
  .apply( BigQueryIO.writeTableRows()
      .to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
      .withSchemaFromView(schemaView)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

完整代码here.

管道创建的 BigQuery 表架构:

以及由此产生的稀疏数据:

【讨论】:

  • 经过大量研究和测试,我们决定使用预定义的表模式。因此,原始数据经过转换,输出始终与表模式匹配。但是,现在我明白了如何解决这种问题。很抱歉回答太长了。
【解决方案2】:

如果您的数据基于架构(avro、protobuf 等)进行序列化,您可以在流式作业中创建/更新表架构。从这个意义上说,它是预定义的,但仍会在处理过程中更新表模式。

【讨论】:

    猜你喜欢
    • 2021-08-10
    • 2020-12-02
    • 1970-01-01
    • 1970-01-01
    • 2017-09-27
    • 1970-01-01
    • 2021-12-13
    • 2016-08-02
    • 1970-01-01
    相关资源
    最近更新 更多