【问题标题】:Hbase Serialization error while inserting data from RDD从RDD插入数据时出现Hbase序列化错误
【发布时间】:2017-07-18 23:05:26
【问题描述】:

我在尝试将数据插入 Hbase 时遇到问题。我在 Google Cloud Spark shell 上运行 scala 代码并尝试将数据从 RDD 插入 Hbase ( BigTable )

hbaseRDD的格式:-- RDD[(String, Map[String, String])]

String 是 Row id,地图包含它的对应列和值。

代码是这样的:-

val tableName: String = "omniture";

val connection = BigtableConfiguration.connect("*******", "**********")   
val admin = connection.getAdmin();
val table = connection.getTable(TableName.valueOf(tableName));

TRY 1 : 
  hbaseRDD.foreach{w => 

         val put = new Put(Bytes.toBytes(w._1));
         var ColumnValue = w._2

         ColumnValue.foreach{x =>       


         put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1 ), Bytes.toBytes(x._2));

                             }
         table.put(put);

      }      

TRY 2 : 
        hbaseRDD.map{w => 

        val put = new Put(Bytes.toBytes(w._1));
        var ColumnValue = w._2

        ColumnValue.map{x =>       

        put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1 ), Bytes.toBytes(x._2));

                             }
         table.put(put);

      } 

下面是我得到的错误:-

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: com.google.cloud.bigtable.hbase.BigtableTable
Serialization stack:
        - object not serializable (class: com.google.cloud.bigtable.hbase.BigtableTable, value: BigtableTable{hashCode=0x7d96618, project=cdp-dev-201706-01, instance=cdp-dev-cl-hbase-instance, table=omniture, host=bigtable.googleapis.com})
        - field (class: logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, name: table$1, type: interface org.apache.hadoop.hbase.client.Table)
        - object (class logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
        ... 27 more

任何帮助将不胜感激。提前致谢。

【问题讨论】:

    标签: scala google-app-engine apache-spark google-cloud-datastore hbase


    【解决方案1】:

    参考来自:- Writing to HBase via Spark: Task not serializable

    下面是正确的做法:-

    hbaseRDD.foreachPartition {w => 
    
              val tableName: String = "omniture";
    
              val connection = BigtableConfiguration.connect("cdp-dev-201706-01", "cdp-dev-cl-hbase-instance")   
              val admin = connection.getAdmin();
    
              val table = connection.getTable(TableName.valueOf(tableName));
    
              w.foreach {f=> 
    
                var put = new Put(Bytes.toBytes(f._1))
    
                var  ColumnValue = f._2
                     ColumnValue.foreach{x =>       
                          put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1 ), Bytes.toBytes(x._2));
                                    }
                 table.put(put);
              }
    
          }    
    
            hbaseRDD.collect();
    

    上面的链接里有详细的解释

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-03-29
      • 1970-01-01
      • 2014-05-02
      • 2019-12-20
      • 2012-10-30
      • 1970-01-01
      • 2018-09-18
      • 1970-01-01
      相关资源
      最近更新 更多