【问题标题】:Bulk Insert Data in HBase using Structured Spark Streaming使用结构化 Spark 流在 HBase 中批量插入数据
【发布时间】:2023-03-21 19:44:01
【问题描述】:

我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。

我在 Cloudera Hadoop 2.6 中,我正在使用 Spark 2.3

我尝试了类似here 的方法。

eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()

MyHBaseWriter 看起来像这样:

class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... 
        p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))

        p
     }
    }

HBaseForeachWriter 类看起来像这样:

trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
  val tableName: String

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _


  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    // I create HBase Connection Here
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(Variables.getTableName()))
  }

  override def process(record: RECORD): Unit = {
    val put = toPut(record)
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def toPut(record: RECORD): Put
}

所以我在这里逐行进行放置,即使我允许 20 个执行程序和每个执行程序 4 个核心,我也没有立即将数据插入 HBase。所以我需要做的是批量加载,但我很挣扎,因为我在互联网上找到的所有东西都是用 RDD 和 Map/Reduce 来实现它。

【问题讨论】:

    标签: scala apache-spark hbase spark-streaming bulkinsert


    【解决方案1】:

    据我所知,hbase 的记录摄取速度很慢。我有一些建议给你。

    1) hbase.client.write.buffer .
    以下属性可能会对您有所帮助。

    hbase.client.write.buffer
    

    说明 BufferedMutator 写入缓冲区的默认大小(以字节为单位)。更大的缓冲区占用更多的内存 — 在客户端和 服务器端,因为服务器将传递的写入缓冲区实例化到 处理它 — 但是更大的缓冲区大小会减少生成的 RPC 的数量。 对于服务器端内存使用的估计,评估 hbase.client.write.buffer * hbase.regionserver.handler.count

    默认 2097152(大约 2 mb)

    我更喜欢 foreachBatch see spark docs(它在 spark 核心中的一种 foreachPartition)而不是 foreach

    还在你的 hbase 编写器中扩展 ForeachWriter

    open方法初始化put的数组列表 在process 中将 put 添加到 puts 的数组列表中 在 close table.put(listofputs); 中,然后在更新表后重置数组列表...

    它的作用基本上是你上面提到的缓冲区大小被填充了 2 mb 然后它将刷新到 hbase 表中。到那时,记录不会进入 hbase 表。

    您可以将其增加到 10mb 等等.... 这样可以减少 RPC 的数量。并且大量数据将被刷新并存储在 hbase 表中。

    当写入缓冲区被填满并触发 hbase 表中的 flushCommits 时。

    示例代码:在我的answer

    2) 关闭 WAL 您可以关闭 WAL(预写日志 - 危险是无法恢复)但它会加快写入速度...如果不想恢复数据。

    注意:如果您在 hbase 表上使用 solr 或 cloudera 搜索,您 不应将其关闭,因为 Solr 将在 WAL 上工作。如果你切换它 然后关闭,Solr 索引将不起作用..这是许多人的常见错误 我们有。

    如何关机: https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)

    正如我提到的 puts 列表是好方法...这是在结构化流示例如下之前执行的旧方法(foreachPartition 和 puts 列表)......其中foreachPartition 对每个分区而不是每一行都进行操作。

    def writeHbase(mydataframe: DataFrame) = {
          val columnFamilyName: String = "c"
          mydataframe.foreachPartition(rows => {
            val puts = new util.ArrayList[ Put ]
            rows.foreach(row => {
              val key = row.getAs[ String ]("rowKey")
              val p = new Put(Bytes.toBytes(key))
              val columnV = row.getAs[ Double ]("x")
              val columnT = row.getAs[ Long ]("y")
              p.addColumn(
                Bytes.toBytes(columnFamilyName),
                Bytes.toBytes("x"),
                Bytes.toBytes(columnX)
              )
              p.addColumn(
                Bytes.toBytes(columnFamilyName),
                Bytes.toBytes("y"),
                Bytes.toBytes(columnY)
              )
              puts.add(p)
            })
            HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
          })
        }
    

    总结一下:

    我的感觉是我们需要了解 spark 和 hbase 的心理学 使然后有效的对。

    【讨论】:

    • 但是 foreachBatch 仅从 Spark 2.4 开始可用,我在 2.3 上:(
    • 好的,然后尝试应该工作的 foreach 中的 put 列表。
    • 嘿 Ram,我已将我的版本更新到 2.4。我在哪里可以找到 HBaseUtil 类?谢谢
    猜你喜欢
    • 2020-10-22
    • 2021-02-06
    • 2018-04-19
    • 1970-01-01
    • 1970-01-01
    • 2021-12-03
    • 2019-01-14
    • 2018-10-06
    • 1970-01-01
    相关资源
    最近更新 更多