【问题标题】:Data is written to BigQuery but not in proper format数据已写入 BigQuery,但格式不正确
【发布时间】:2018-03-13 10:51:25
【问题描述】:

我正在将数据写入 BigQuery 并成功写入其中。但我担心它的编写格式。

以下是我在 BigQuery 中执行任何查询时显示数据的格式:

检查第一行,SalesComponent 的值为 CPS_H,但它显示 'BeamRecord [dataValues=[CPS_H'' 并且在 ModelIteration 中,该值以方括号结束。

以下是用于将数据从 BeamSql 推送到 BigQuery 的代码:

TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(
    new TableFieldSchema().setName("SalesComponent").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("DuetoValue").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("ModelIteration").setType("STRING").setMode("REQUIRED")
));

TableReference tableSpec = BigQueryHelpers.parseTableSpec("beta-194409:data_id1.tables_test");
System.out.println("Start Bigquery");
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via(
    (MyOutputClass elem) -> new TableRow().set("SalesComponent", elem.SalesComponent).set("DuetoValue", elem.DuetoValue).set("ModelIteration", elem.ModelIteration)))
        .apply(BigQueryIO.writeTableRows()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

p.run().waitUntilFinish();

编辑

我已使用以下代码将 BeamRecord 转换为 MyOutputClass 类型,但这也不起作用:

 PCollection<MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
             BeamRecord record = c.element();
               String[] strArr = record.toString().split(",");
            MyOutputClass moc = new MyOutputClass();
            moc.setSalesComponent(strArr[0]);
            moc.setDuetoValue(strArr[1]);
            moc.setModelIteration(strArr[2]);
            c.output(moc);
        }
    }));

【问题讨论】:

    标签: google-bigquery apache-beam


    【解决方案1】:

    您的 MyOutputClass 似乎构造不正确(值不正确)。如果你看一下,BigQueryIO 能够创建具有正确字段的行就好了。但是这些字段的值是错误的。这意味着当您调用.set("SalesComponent", elem.SalesComponent) 时,您在elem 中已经有不正确的数据。

    我的猜测是问题出在上一步,当您从 BeamRecord 转换为 MyOutputClass 时。如果你做了这样的事情(或者其他一些转换逻辑在幕后为你做了这个),你会得到类似于你所看到的结果:

    • 通过调用beamRecord.toString()BeamRecord转换为字符串;
      • 如果您查看 BeamRecord.toString() 实现,您会发现您得到的正是该字符串格式;
    • 通过,分割这个字符串得到一个字符串数组;
    • 从该数组构造MyOutputClass

    伪代码类似于:

    PCollection<MyOutputClass> final_out = 
      beamRecords
        .apply(
          ParDo.of(new DoFn() {
    
            @ProcessElement
            void processElement(Context c) {
               BeamRecord record = c.elem();
               String[] fields = record.toString().split(",");
               MyOutputClass elem = new MyOutputClass();
               elem.SalesComponent = fields[0];
               elem.DuetoValue = fields[1];
               ...
               c.output(elem);
            }
          })
        );
    

    这样做的正确方法是在记录上调用 getter,而不是按照这些行(伪代码)拆分其字符串表示:

    PCollection<MyOutputClass> final_out = 
          beamRecords
            .apply(
              ParDo.of(new DoFn() {
    
                @ProcessElement
                void processElement(Context c) {
                   BeamRecord record = c.elem();
                   MyOutputClass elem = new MyOutputClass();
    
                   //get field value by name
                   elem.SalesComponent = record.getString("CPS_H..."); 
    
                   // get another field value by name
                   elem.DuetoValue = record.getInteger("...");
                   ...
                   c.output(elem);
                }
              })
            );
    

    您可以通过添加一个简单的ParDo 来验证类似情况,您可以在其中放置断点并查看调试器中的元素,或者将元素输出到其他地方(例如控制台)。

    【讨论】:

    • 看起来类似于我的第一个示例,这是从BeamRecord 中获取字段的错误方法。您应该获取和设置moc.setSalesComponent(record.getString("CPS_H...") 之类的字段,而不是字符串拆分,请参见我的第二个示例
    【解决方案2】:

    我能够使用以下方法解决此问题:

     PCollection<MyOutputClass> final_out = record40.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
            private static final long serialVersionUID = 1L;
            @ProcessElement
            public void processElement(ProcessContext c) throws ParseException {
                 BeamRecord record = c.element();
                   String strArr = record.toString();
                   String strArr1 = strArr.substring(24);
                   String xyz = strArr1.replace("]","");
                   String[] strArr2 = xyz.split(",");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-01-16
      • 1970-01-01
      • 2017-06-03
      • 2012-06-09
      • 1970-01-01
      • 2018-06-03
      • 1970-01-01
      相关资源
      最近更新 更多