package com.features.dl
import java.util
import java.util.stream.Collectors
import java.util.{Arrays, Comparator, List}

import com.features.model.{CommonLabelPointForDINOnline, Keyword}
import com.mysql.jdbc.Connection
import com.sun.rowset.internal.Row
import com.utils.{CommonUtils, FeatureUtils}
import com.utils.ConnUtils.HBaseClient.conf
import org.apache.commons.httpclient.HttpConnectionManager
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.{Base64, Bytes, MD5Hash}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer
import collection.JavaConverters._
import scala.util.Try

/*!/usr/bin/env java
# -*- coding:utf-8 -*-
# Author: supeihuang
 Time: 2019/9/27 14:42*/

object  ExtractDataFromHbase {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().getOrCreate()
    import spark.implicits._
    val sc = spark.sparkContext
    //读取数据
    val content = spark.read.textFile("hdfs://hdfs2-nameservice/user/supeihuang/1.txt")
    //得到userid
    val ds = content.map(arr => {
      val data = arr.split(",")
      val userId = data(0).toLong
      (userId)
    }).persist(StorageLevel.MEMORY_AND_DISK).toDF("user_id")


//    print(ds.show())
    //hbase抽取数据
    val data1 = addUserFeatures(ds).toDF("user_id", "tagId", "values")
    data1.rdd.saveAsTextFile("hdfs://hdfs2-nameservice/user/supeihuang/values2.txt")
    data1.show(20)

    //批量抽取hbase数据
    val data2=getUserDataFromHbase(ds,1).map(y=>{
      val key=y._1.split("_")
      val features=y._2
      features
    })
   data2.saveAsTextFile("hdfs://hdfs2-nameservice/user/supeihuang/values.txt")

  }

  def addUserFeatures(df: DataFrame, numPart: Int = 64):
  Dataset[(Long, ArrayBuffer[String], ArrayBuffer[String])] = {
    val spark = df.sparkSession
    import spark.implicits._

    df.repartition(numPart, col("user_id")).mapPartitions(iter => {
      val records = iter.toList
      //抽取use_id的数据
      val uids = records.map(row => row.getAs[Long]("user_id")).toSet
      //配置hbase数据库
      val conf = createHBaseConf()
      //创建hbase连接
      val hConn = ConnectionFactory.createConnection(conf)

      //通过表名得到想要查询的表
      val hTable = hConn.getTable(TableName.valueOf("user_show_tag"))
      uids.map(uid => {
        //获取每个用户的rowkey
        val rowkey = getUserRowKey(uid.toString)
        val get = new Get(Bytes.toBytes(rowkey))
        //查表得到结果
        val result = hTable.get(get)
        //列出所有的单元格
        val cells = result.listCells().asScala
        var value = ""
        var tagId = ""
        var allValues: ArrayBuffer[String] = new ArrayBuffer[String]
        var allTagId: ArrayBuffer[String] = new ArrayBuffer[String]
        if (cells != null) {
          for (kv <- cells) {
            //每个单元格含有 列族 列名  列值
            //获取列族
            val family = Bytes.toString(CellUtil.cloneFamily(kv))
            if ("t".equals(family)) {
              if (StringUtils.isNotEmpty(Bytes.toString(CellUtil.cloneValue(kv)))) {
                //获取列名
                tagId = Bytes.toString(CellUtil.cloneQualifier(kv))
                //获取列值
                value = Bytes.toString(CellUtil.cloneValue(kv))
                allValues.append(value)
                allTagId.append(tagId)
              }
            }
          }
        }
        (uid, allTagId, allValues)
      }).toIterator
    }).persist(StorageLevel.MEMORY_AND_DISK)
  }

  def createHBaseConf(): Configuration = {
    val ret = HBaseConfiguration.create()
    ret.set("hbase.zookeeper.quorum", "*")
    ret.set("zookeeper.znode.parent", "/hbase")
    // 对HBase进行读写,建议关闭Hadoop的Speculative Execution功能
    ret.setBoolean("mapred.map.tasks.speculative.execution", false)
    ret.setBoolean("mapred.reduce.tasks.speculative.execution", false)
    ret
  }

  def getUserRowKey(id: String): String = {
    MD5Hash.getMD5AsHex(Bytes.toBytes(id)).substring(0, 8) + "_" + id
  }

  def getFeatures(list:List[Get],tableName: String,conn:HConnection)={
    getBatchData(tableName,list,conn).map(row=>{
      val rowKey=Bytes.toString(row.getRow)
      val map:java.util.HashMap[String,String]=new util.HashMap()
      if(rowKey!=null){
        val key=rowKey.substring(9)
        val cells=row.listCells().asScala
        if(cells!=null){
          for (kv<-cells){
            val famliy=Bytes.toString(CellUtil.cloneFamily(kv))
            val qua=Bytes.toString(CellUtil.cloneQualifier(kv))
            val strValue=null
            for (kv <- cells) {
              val family: String = Bytes.toString(CellUtil.cloneFamily(kv))
              val qua: String = Bytes.toString(CellUtil.cloneQualifier(kv))
              var strValue: String = null
              if ((family == "a" || family == "sd" || family == "sh") &&
                !(qua.contains("cliN") || qua.contains("shoN") || qua.contains("ctr") ||
                  qua.contains("wilCtr") || qua.contains("ktctr") || qua.contains("cliUV") ||
                  qua.contains("shoUV") || qua.contains("ctrUV") || qua.contains("bayCtrNew") ||
                  qua.contains("bayCtrCold") || qua.contains("bayCtrAvg") || qua.contains("score") ||
                  qua.contains("norScore"))) {
                if (qua.contains("totalTime") ) {
                  strValue = Bytes.toLong(CellUtil.cloneValue(kv)) / 1000 + ""
                }
                else {
                  strValue = Bytes.toLong(CellUtil.cloneValue(kv)) + ""
                }
              }
              else if(qua.contains("Inc")){
                strValue = Bytes.toLong(CellUtil.cloneValue(kv)) + ""
              }
              else {
                strValue = Bytes.toString(CellUtil.cloneValue(kv))
              }
              if ("None" == strValue || "null" == strValue) strValue = "-1"
              if ("true" == strValue) strValue = "1"
              if(!"ttP".equals(qua) && !"ktW".equals(qua)){
                map.put(qua,strValue)
              }
              //二级标签和关键词按当前线上版本取
              else if ("c2ttP".equals(family+qua)){
                map.put(qua,strValue)
              }
              else if ("c3ktW".equals(family+qua)){
                map.put(qua,strValue)
              }
            }

          }
        }
        (key,map)
      }else{
        ("",map)
      }

    }).filter(_._1!="").toIterator
  }

  def getBatchData(tableName: String,gets:util.List[Get],conn: HConnection)={
    val table =conn.getTable(tableName)
    val results=table.get(gets)
    println("results:"+results.toList)
    results
  }

  def getUserDataFromHbase(df:DataFrame,num:Int)={
    df.rdd.repartition(num).mapPartitions(iter=>{
      //配置hbase
      val conf = createHBaseConf()
      //创建链接
      val conn=HConnectionManager.createConnection(conf)

      val record=iter.toList
      val list=record.map(user=>{
        val uids=user.getAs[Long]("user_id")
        val rowKey=getUserRowKey(uids.toString)
        val get=new Get(Bytes.toBytes(rowKey))
        //添加列簇
        get.addFamily(Bytes.toBytes("e"))
          .addFamily(Bytes.toBytes("h"))
          .addFamily(Bytes.toBytes("a"))
          .addFamily(Bytes.toBytes("c2"))
          .addFamily(Bytes.toBytes("c3"))
          .addFamily(Bytes.toBytes("i"))
        get
      }).asJava
      getFeatures(list,"user_features",conn)
    })
  }

}


其中1.txt文件格式:

scala多种方法实现Hbase数据抽取

抽取出来的数据:

scala多种方法实现Hbase数据抽取

相关文章:

  • 2021-09-04
  • 2021-12-01
  • 2022-12-23
  • 2021-11-22
  • 2022-12-23
  • 2021-08-27
  • 2021-09-24
  • 2021-11-03
猜你喜欢
  • 2021-11-27
  • 2022-01-02
  • 2022-12-23
  • 2022-01-19
相关资源
相似解决方案