【发布时间】:2023-03-21 19:44:01
【问题描述】:
我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。
我在 Cloudera Hadoop 2.6 中,我正在使用 Spark 2.3
我尝试了类似here 的方法。
eventhubs.writeStream
.foreach(new MyHBaseWriter[Row])
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
MyHBaseWriter 看起来像这样:
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
override def toPut(record: Row): Put = {
override val tableName: String = "hbase-table-name"
override def toPut(record: Row): Put = {
// Get Json
val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
val key = data.getOrElse(Map())("key")+ ""
val val = data.getOrElse(Map())("val")+ ""
val p = new Put(Bytes.toBytes(key))
//Add columns ...
p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))
p
}
}
HBaseForeachWriter 类看起来像这样:
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
val tableName: String
def pool: Option[ExecutorService] = None
def user: Option[User] = None
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}
def createConnection(): Connection = {
// I create HBase Connection Here
}
def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(Variables.getTableName()))
}
override def process(record: RECORD): Unit = {
val put = toPut(record)
hTable.put(put)
}
override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}
def toPut(record: RECORD): Put
}
所以我在这里逐行进行放置,即使我允许 20 个执行程序和每个执行程序 4 个核心,我也没有立即将数据插入 HBase。所以我需要做的是批量加载,但我很挣扎,因为我在互联网上找到的所有东西都是用 RDD 和 Map/Reduce 来实现它。
【问题讨论】:
标签: scala apache-spark hbase spark-streaming bulkinsert