自定义sparkSQL数据源的过程中,需要对sparkSQL表的schema和Hbase表的schema进行整合;
对于spark来说,要想自定义数据源,你可以实现这3个接口:
BaseRelation 代表了一个抽象的数据源。该数据源由一行行有着已知schema的数据组成(关系表)。
TableScan 用于扫描整张表,将数据返回成RDD[Row]。
RelationProvider 顾名思义,根据用户提供的参数返回一个数据源(BaseRelation)。
当然,TableScan其实是最粗粒度的查询,代表一次性扫描整张表,如果有需求,更细粒度在数据源处过滤掉数据,可以实现:
PrunedScan:可以列剪枝
PrunedFilteredScan:列剪枝 + 过滤
所以,如果对接Hbase的话,就定义一个Hbase的relation
class DefaultSource extends RelationProvider { def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = { HBaseRelation(parameters)(sqlContext) } }
case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{ val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema")) val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema")) val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema")) val rowRange = hbaseProps.getOrElse("row_range", "->") //get star row and end row val range = rowRange.split("->",-1) val startRowKey = range(0).trim val endRowKey = range(1).trim val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field val registerTableFields = extractRegisterSchema(registerTableSchema) val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields) val hbaseTableFields = feedTypes(tempFieldRelation) val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields) val queryColumns = getQueryTargetCloumns(hbaseTableFields) def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = { val hbaseFields = mapping.map{ case (k,v) => val field = k.copy(fieldType=v.fieldType) field } hbaseFields.toArray } def isRowKey(field: HBaseSchemaField) : Boolean = { val cfColArray = field.fieldName.split(":",-1) val cfName = cfColArray(0) val colName = cfColArray(1) if(cfName=="" && colName=="key") true else false } def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = { var str = ArrayBuffer[String]() hbaseTableFields.foreach{ field=> if(!isRowKey(field)) { str.append(field.fieldName) } } println(str.mkString(" ")) str.mkString(" ") } lazy val schema = { val fields = hbaseTableFields.map{ field=> val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName val relatedType = field.fieldType match { case "String" => SchemaType(StringType,nullable = false) case "Int" => SchemaType(IntegerType,nullable = false) case "Long" => SchemaType(LongType,nullable = false) case "Double" => SchemaType(DoubleType,nullable = false) } StructField(name,relatedType.dataType,relatedType.nullable) } StructType(fields) } def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = { if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!") val rs: Array[(HBaseSchemaField, RegisteredSchemaField)] = externalHBaseTable.zip(registerTable) rs.toMap } /** * spark sql schema will be register * registerTableSchema '(rowkey string, value string, column_a string)' */ def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = { val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1) val fieldsArray = fieldsStr.split(",").map(_.trim)//sorted fieldsArray.map{ fildString => val splitedField = fildString.split("\\s+", -1)//sorted RegisteredSchemaField(splitedField(0), splitedField(1)) } } def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = { val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1) val fieldsArray = fieldsStr.split(",").map(_.trim) fieldsArray.map(fildString => HBaseSchemaField(fildString,"")) } // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits. lazy val buildScan = { val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", GlobalConfigUtils.hbaseQuorem) hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName) hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns) hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey) hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey) val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD( hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val rs = hbaseRdd.map(tuple => tuple._2).map(result => { var values = new ArrayBuffer[Any]() hbaseTableFields.foreach{field=> values += Resolver.resolve(field,result) } Row.fromSeq(values.toSeq) }) rs } private case class SchemaType(dataType: DataType, nullable: Boolean) }