关于自定义sparkSQL数据源(Hbase)操作中遇到的坑

 

自定义sparkSQL数据源的过程中,需要对sparkSQL表的schema和Hbase表的schema进行整合;

对于spark来说,要想自定义数据源,你可以实现这3个接口:

BaseRelation 代表了一个抽象的数据源。该数据源由一行行有着已知schema的数据组成(关系表)。
TableScan 用于扫描整张表,将数据返回成RDD[Row]。
RelationProvider 顾名思义,根据用户提供的参数返回一个数据源(BaseRelation)。

当然,TableScan其实是最粗粒度的查询,代表一次性扫描整张表,如果有需求,更细粒度在数据源处过滤掉数据,可以实现:

PrunedScan:可以列剪枝

PrunedFilteredScan:列剪枝 + 过滤

所以,如果对接Hbase的话,就定义一个Hbase的relation

class DefaultSource extends RelationProvider {
  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    HBaseRelation(parameters)(sqlContext)
  }
}
case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{

  val hbaseTableName =  hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  val hbaseTableSchema =  hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  val rowRange = hbaseProps.getOrElse("row_range", "->")
  //get star row and end row
  val range = rowRange.split("->",-1)
  val startRowKey = range(0).trim
  val endRowKey = range(1).trim
  val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  val registerTableFields = extractRegisterSchema(registerTableSchema)
  val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
  val hbaseTableFields = feedTypes(tempFieldRelation)
  val fieldsRelations =  tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  val queryColumns =  getQueryTargetCloumns(hbaseTableFields)
  def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) :  Array[HBaseSchemaField] = {
    val hbaseFields = mapping.map{
      case (k,v) =>
        val field = k.copy(fieldType=v.fieldType)
        field
    }
    hbaseFields.toArray
  }




  def isRowKey(field: HBaseSchemaField) : Boolean = {
    val cfColArray = field.fieldName.split(":",-1)
    val cfName = cfColArray(0)
    val colName =  cfColArray(1)
    if(cfName=="" && colName=="key") true else false
  }

  
  def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
    var str = ArrayBuffer[String]()
    hbaseTableFields.foreach{ field=>
      if(!isRowKey(field)) {
        str.append(field.fieldName)
      }
    }
    println(str.mkString(" "))
    str.mkString(" ")
  }
  lazy val schema = {
    val fields = hbaseTableFields.map{ field=>
      val name  = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
      val relatedType =  field.fieldType match  {
        case "String" =>
          SchemaType(StringType,nullable = false)
        case "Int" =>
          SchemaType(IntegerType,nullable = false)
        case "Long" =>
          SchemaType(LongType,nullable = false)
        case "Double" =>
          SchemaType(DoubleType,nullable = false)

      }
      StructField(name,relatedType.dataType,relatedType.nullable)
    }
    StructType(fields)
  }
  

  def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField],  registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
    if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
    val rs: Array[(HBaseSchemaField, RegisteredSchemaField)] = externalHBaseTable.zip(registerTable)
    rs.toMap
  }


  /**
    * spark sql schema will be register
    *   registerTableSchema   '(rowkey string, value string, column_a string)'
    */
  def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
    val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
    val fieldsArray = fieldsStr.split(",").map(_.trim)//sorted
    fieldsArray.map{ fildString =>
      val splitedField = fildString.split("\\s+", -1)//sorted
      RegisteredSchemaField(splitedField(0), splitedField(1))
    }
  }

  
  def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
    val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
    val fieldsArray = fieldsStr.split(",").map(_.trim)
    fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  }

  // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  lazy val buildScan = {

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", GlobalConfigUtils.hbaseQuorem)
    hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
    hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns)
    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey)
    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey)

    val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
      hbaseConf,
      classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    )

    val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
      var values = new ArrayBuffer[Any]()
      hbaseTableFields.foreach{field=>
        values += Resolver.resolve(field,result)
      }
      Row.fromSeq(values.toSeq)
    })
    rs
  }

  private case class SchemaType(dataType: DataType, nullable: Boolean)
}
HBaseRelation

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-09-27
  • 2022-12-23
  • 2021-05-24
  • 2021-07-05
猜你喜欢
  • 2021-12-11
  • 2021-11-08
  • 2021-05-17
  • 2022-12-23
  • 2021-11-14
  • 2022-02-08
相关资源
相似解决方案