【发布时间】:2018-03-13 10:51:25
【问题描述】:
我正在将数据写入 BigQuery 并成功写入其中。但我担心它的编写格式。
以下是我在 BigQuery 中执行任何查询时显示数据的格式:
检查第一行,SalesComponent 的值为 CPS_H,但它显示 'BeamRecord [dataValues=[CPS_H'' 并且在 ModelIteration 中,该值以方括号结束。
以下是用于将数据从 BeamSql 推送到 BigQuery 的代码:
TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(
new TableFieldSchema().setName("SalesComponent").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("DuetoValue").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("ModelIteration").setType("STRING").setMode("REQUIRED")
));
TableReference tableSpec = BigQueryHelpers.parseTableSpec("beta-194409:data_id1.tables_test");
System.out.println("Start Bigquery");
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via(
(MyOutputClass elem) -> new TableRow().set("SalesComponent", elem.SalesComponent).set("DuetoValue", elem.DuetoValue).set("ModelIteration", elem.ModelIteration)))
.apply(BigQueryIO.writeTableRows()
.to(tableSpec)
.withSchema(tableSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
编辑
我已使用以下代码将 BeamRecord 转换为 MyOutputClass 类型,但这也不起作用:
PCollection<MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
BeamRecord record = c.element();
String[] strArr = record.toString().split(",");
MyOutputClass moc = new MyOutputClass();
moc.setSalesComponent(strArr[0]);
moc.setDuetoValue(strArr[1]);
moc.setModelIteration(strArr[2]);
c.output(moc);
}
}));
【问题讨论】:
标签: google-bigquery apache-beam