package com.features.dl
import java.util
import java.util.stream.Collectors
import java.util.{Arrays, Comparator, List}
import com.features.model.{CommonLabelPointForDINOnline, Keyword}
import com.mysql.jdbc.Connection
import com.sun.rowset.internal.Row
import com.utils.{CommonUtils, FeatureUtils}
import com.utils.ConnUtils.HBaseClient.conf
import org.apache.commons.httpclient.HttpConnectionManager
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.{Base64, Bytes, MD5Hash}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import collection.JavaConverters._
import scala.util.Try
/*!/usr/bin/env java
# -*- coding:utf-8 -*-
# Author: supeihuang
Time: 2019/9/27 14:42*/
object ExtractDataFromHbase {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
//读取数据
val content = spark.read.textFile("hdfs://hdfs2-nameservice/user/supeihuang/1.txt")
//得到userid
val ds = content.map(arr => {
val data = arr.split(",")
val userId = data(0).toLong
(userId)
}).persist(StorageLevel.MEMORY_AND_DISK).toDF("user_id")
// print(ds.show())
//hbase抽取数据
val data1 = addUserFeatures(ds).toDF("user_id", "tagId", "values")
data1.rdd.saveAsTextFile("hdfs://hdfs2-nameservice/user/supeihuang/values2.txt")
data1.show(20)
//批量抽取hbase数据
val data2=getUserDataFromHbase(ds,1).map(y=>{
val key=y._1.split("_")
val features=y._2
features
})
data2.saveAsTextFile("hdfs://hdfs2-nameservice/user/supeihuang/values.txt")
}
def addUserFeatures(df: DataFrame, numPart: Int = 64):
Dataset[(Long, ArrayBuffer[String], ArrayBuffer[String])] = {
val spark = df.sparkSession
import spark.implicits._
df.repartition(numPart, col("user_id")).mapPartitions(iter => {
val records = iter.toList
//抽取use_id的数据
val uids = records.map(row => row.getAs[Long]("user_id")).toSet
//配置hbase数据库
val conf = createHBaseConf()
//创建hbase连接
val hConn = ConnectionFactory.createConnection(conf)
//通过表名得到想要查询的表
val hTable = hConn.getTable(TableName.valueOf("user_show_tag"))
uids.map(uid => {
//获取每个用户的rowkey
val rowkey = getUserRowKey(uid.toString)
val get = new Get(Bytes.toBytes(rowkey))
//查表得到结果
val result = hTable.get(get)
//列出所有的单元格
val cells = result.listCells().asScala
var value = ""
var tagId = ""
var allValues: ArrayBuffer[String] = new ArrayBuffer[String]
var allTagId: ArrayBuffer[String] = new ArrayBuffer[String]
if (cells != null) {
for (kv <- cells) {
//每个单元格含有 列族 列名 列值
//获取列族
val family = Bytes.toString(CellUtil.cloneFamily(kv))
if ("t".equals(family)) {
if (StringUtils.isNotEmpty(Bytes.toString(CellUtil.cloneValue(kv)))) {
//获取列名
tagId = Bytes.toString(CellUtil.cloneQualifier(kv))
//获取列值
value = Bytes.toString(CellUtil.cloneValue(kv))
allValues.append(value)
allTagId.append(tagId)
}
}
}
}
(uid, allTagId, allValues)
}).toIterator
}).persist(StorageLevel.MEMORY_AND_DISK)
}
def createHBaseConf(): Configuration = {
val ret = HBaseConfiguration.create()
ret.set("hbase.zookeeper.quorum", "*")
ret.set("zookeeper.znode.parent", "/hbase")
// 对HBase进行读写,建议关闭Hadoop的Speculative Execution功能
ret.setBoolean("mapred.map.tasks.speculative.execution", false)
ret.setBoolean("mapred.reduce.tasks.speculative.execution", false)
ret
}
def getUserRowKey(id: String): String = {
MD5Hash.getMD5AsHex(Bytes.toBytes(id)).substring(0, 8) + "_" + id
}
def getFeatures(list:List[Get],tableName: String,conn:HConnection)={
getBatchData(tableName,list,conn).map(row=>{
val rowKey=Bytes.toString(row.getRow)
val map:java.util.HashMap[String,String]=new util.HashMap()
if(rowKey!=null){
val key=rowKey.substring(9)
val cells=row.listCells().asScala
if(cells!=null){
for (kv<-cells){
val famliy=Bytes.toString(CellUtil.cloneFamily(kv))
val qua=Bytes.toString(CellUtil.cloneQualifier(kv))
val strValue=null
for (kv <- cells) {
val family: String = Bytes.toString(CellUtil.cloneFamily(kv))
val qua: String = Bytes.toString(CellUtil.cloneQualifier(kv))
var strValue: String = null
if ((family == "a" || family == "sd" || family == "sh") &&
!(qua.contains("cliN") || qua.contains("shoN") || qua.contains("ctr") ||
qua.contains("wilCtr") || qua.contains("ktctr") || qua.contains("cliUV") ||
qua.contains("shoUV") || qua.contains("ctrUV") || qua.contains("bayCtrNew") ||
qua.contains("bayCtrCold") || qua.contains("bayCtrAvg") || qua.contains("score") ||
qua.contains("norScore"))) {
if (qua.contains("totalTime") ) {
strValue = Bytes.toLong(CellUtil.cloneValue(kv)) / 1000 + ""
}
else {
strValue = Bytes.toLong(CellUtil.cloneValue(kv)) + ""
}
}
else if(qua.contains("Inc")){
strValue = Bytes.toLong(CellUtil.cloneValue(kv)) + ""
}
else {
strValue = Bytes.toString(CellUtil.cloneValue(kv))
}
if ("None" == strValue || "null" == strValue) strValue = "-1"
if ("true" == strValue) strValue = "1"
if(!"ttP".equals(qua) && !"ktW".equals(qua)){
map.put(qua,strValue)
}
//二级标签和关键词按当前线上版本取
else if ("c2ttP".equals(family+qua)){
map.put(qua,strValue)
}
else if ("c3ktW".equals(family+qua)){
map.put(qua,strValue)
}
}
}
}
(key,map)
}else{
("",map)
}
}).filter(_._1!="").toIterator
}
def getBatchData(tableName: String,gets:util.List[Get],conn: HConnection)={
val table =conn.getTable(tableName)
val results=table.get(gets)
println("results:"+results.toList)
results
}
def getUserDataFromHbase(df:DataFrame,num:Int)={
df.rdd.repartition(num).mapPartitions(iter=>{
//配置hbase
val conf = createHBaseConf()
//创建链接
val conn=HConnectionManager.createConnection(conf)
val record=iter.toList
val list=record.map(user=>{
val uids=user.getAs[Long]("user_id")
val rowKey=getUserRowKey(uids.toString)
val get=new Get(Bytes.toBytes(rowKey))
//添加列簇
get.addFamily(Bytes.toBytes("e"))
.addFamily(Bytes.toBytes("h"))
.addFamily(Bytes.toBytes("a"))
.addFamily(Bytes.toBytes("c2"))
.addFamily(Bytes.toBytes("c3"))
.addFamily(Bytes.toBytes("i"))
get
}).asJava
getFeatures(list,"user_features",conn)
})
}
}
其中1.txt文件格式:
抽取出来的数据: