【问题标题】:Why is the HBase API returning empty set after scanning a table with a prefix filter?为什么 HBase API 在扫描带有前缀过滤器的表后返回空集?
【发布时间】:2018-08-22 05:07:09
【问题描述】:

我正在做一个实时管道,将 Spark Streaming 与 HBase 连接起来。为了这个过程,我必须在 HBase 表中执行一个过滤器,特别是一个前缀过滤器,因为我想匹配键以某个字符串开头的记录。

我正在过滤的表名为“hm_notificaciones”。我可以成功连接到 Hbase shell 并从命令行扫描表。运行以下命令:

scan "hm_notificaciones"

我得到以下记录:

行列+单元格

 46948854-20180307              column=info_oferta:id_oferta, timestamp=1520459448795, value=123456

 46948854-20180312170423        column=info_oferta:id_establecimiento, timestamp=1520892403770, value=9999

 46948854-20180312170423        column=info_oferta:id_oferta, timestamp=1520892390858, value=123445

 46948854-20180312170536        column=info_oferta:id_establecimiento, timestamp=1520892422044, value=9239

 46948854-20180312170536        column=info_oferta:id_oferta, timestamp=1520892435173, value=4432

 46948854-20180313110824        column=info_oferta:id_establecimiento, timestamp=1520957374921, value=9990

 46948854-20180313110824        column=info_oferta:id_oferta, timestamp=1520957362458, value=12313

我一直在尝试使用 Hbase API 运行前缀过滤器。我正在编写一些 Scala 代码来连接到 API 并制作过滤器。以下代码编译并执行,但返回空结果:

def scanTable( table_name:String, family: String, search_key: String )= {
  val conf: Configuration = HBaseConfiguration.create()
  val connection: Connection = ConnectionFactory.createConnection(conf)

  // This is a test to verify if I can connect to HBase API. 
  //This statements work and print all the table names in HBase
  val admin = connection.getAdmin

  println("Listing all tablenames")
  val list_table_names = admin.listTableNames()
  list_table_names.foreach(println)

  val table: Table = connection.getTable( TableName.valueOf(table_name) )
  //val htable = new HTable(conf, tableName)

  var colValueMap: Map[String, String] = Map()
  var keyColValueMap: Map[String, Map[String, String]] = Map()
  val prefix = Bytes.toBytes(search_key)
  val scan = new Scan(prefix)
  scan.addFamily(Bytes.toBytes(family))
  val prefix_filter = new PrefixFilter(prefix)
  scan.setFilter(prefix_filter)
  val scanner = table.getScanner(scan)

  for( row <- scanner){
    val content = row.getNoVersionMap
    for( entry <- content.entrySet ){
      for( sub_entry <- entry.getValue.entrySet){
        colValueMap += (Bytes.toString( sub_entry.getKey) -> Bytes.toString(sub_entry.getValue) )
      }
      keyColValueMap += (Bytes.toString(row.getRow) -> colValueMap )
    }
  }

  //this doesn't execute 
  for( ( k, v) <- colValueMap) {
    printf( "key: %s", "value: %s\n", k, v )
  }

  //this never executes since scanner is null (or empty)
  for (result <- scanner) {
    for (cell <- result.rawCells) {
      println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
    }
  }

  scanner.close
  table.close
  connection.close

}

我尝试了两种打印/获取数据的方法:编写地图和遍历 ResultScanner。但是,我的过滤器似乎无法正常工作,因为它返回的是 null/空集。

您知道是否有其他方法可以在 Hbase 上执行前缀过滤器?

我用来测试上述代码的代码如下:

user_key = "46948854-20181303144609"
scanTable("hm_notificaciones", "info_oferta", user_key)

【问题讨论】:

  • 也许我理解错了,但是46948854-20181303144609 的密钥不在scan "hm_notificaciones" 返回的数据中。这只是返回数据的一个子集吗?
  • 是的,我就是这么想的。试试 - user_key = "46948854"

标签: java scala apache-spark hbase bigdata


【解决方案1】:

第二个循环,不会进入,因为你已经在上一步迭代了扫描器。

  for (result <- scanner) {
    for (cell <- result.rawCells) {
      println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
    }
  }

并使用 keyColValueMap 进行打印。它对我有用,再次检查您的前缀过滤器。

  for( ( k, v) <- colValueMap) {
    printf( "key: %s", "value: %s\n", k, v )
  }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-07-29
    • 2013-09-12
    • 1970-01-01
    • 2017-04-25
    • 2012-06-12
    • 1970-01-01
    • 2019-07-26
    • 1970-01-01
    相关资源
    最近更新 更多