我做了一些测试来模拟典型的自动检测模式:首先我运行所有数据以构建所有可能字段和类型的Map(这里我只考虑了String 或Integer简单)。我使用stateful 管道来跟踪已经看到的字段并将其保存为PCollectionView。这样我就可以使用.withSchemaFromView(),因为架构在管道构造中是未知的。请注意,此方法仅对批处理作业有效。
首先,我创建了一些没有严格模式的虚拟数据,其中每一行可能包含也可能不包含任何字段:
PCollection<KV<Integer, String>> input = p
.apply("Create data", Create.of(
KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
);
我们将读取输入数据并构建我们在数据中看到的不同字段名称的Map,并进行基本类型检查以确定它是否包含INTEGER 或STRING。当然,如果需要,这可以扩展。请注意,之前创建的所有数据都分配给同一个键,以便将它们组合在一起,我们可以构建完整的字段列表,但这可能是性能瓶颈。我们将输出具体化,以便我们可以将其用作侧输入:
PCollectionView<Map<String, String>> schemaSideInput = input
.apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {
// A map containing field-type pairs
@StateId("schema")
private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
@ProcessElement
public void processElement(ProcessContext c,
@StateId("schema") ValueState<Map<String, String>> schemaSpec) {
JSONObject message = new JSONObject(c.element().getValue());
Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());
// iterate through fields
message.keySet().forEach(key ->
{
Object value = message.get(key);
if (!current.containsKey(key)) {
String type = "STRING";
try {
Integer.parseInt(value.toString());
type = "INTEGER";
}
catch(Exception e) {}
// uncomment if debugging is needed
// LOG.info("key: "+ key + " value: " + value + " type: " + type);
c.output(KV.of(key, type));
current.put(key, type);
schemaSpec.write(current);
}
});
}
})).apply("Save as Map", View.asMap());
现在我们可以使用之前的 Map 来构建包含 BigQuery 表架构的 PCollectionView:
PCollectionView<Map<String, String>> schemaView = p
.apply("Start", Create.of("Start"))
.apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> schemaFields = c.sideInput(schemaSideInput);
List<TableFieldSchema> fields = new ArrayList<>();
for (Map.Entry<String, String> field : schemaFields.entrySet())
{
fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
// LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
}
TableSchema schema = new TableSchema().setFields(fields);
String jsonSchema;
try {
jsonSchema = Transport.getJsonFactory().toString(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));
}}).withSideInputs(schemaSideInput))
.apply("Save as Singleton", View.asSingleton());
相应地更改完全限定的表名PROJECT_ID:DATASET_NAME.dynamic_bq_schema。
最后,在我们的管道中,我们读取数据,将其转换为 TableRow,然后使用 .withSchemaFromView(schemaView) 将其写入 BigQuery:
input
.apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject message = new JSONObject(c.element().getValue());
TableRow row = new TableRow();
message.keySet().forEach(key ->
{
Object value = message.get(key);
row.set(key, value);
});
c.output(row);
}}))
.apply( BigQueryIO.writeTableRows()
.to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
.withSchemaFromView(schemaView)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
完整代码here.
管道创建的 BigQuery 表架构:
以及由此产生的稀疏数据: