【问题标题】:Spark not be able to retrieve all of Hbase data in specific columnSpark 无法检索特定列中的所有 Hbase 数据
【发布时间】:2016-01-26 19:30:07
【问题描述】:

我的Hbase表有3000万条记录,每条记录都有raw:sample列,raw是columnfamily sample是column。这个栏目很大,大小从几KB到50MB。当我运行下面的 Spark 代码时,它只能得到 40000 条记录,但我应该得到 3000 万条记录:

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "10.1.1.15:2181")
conf.set(TableInputFormat.INPUT_TABLE, "sampleData")
conf.set(TableInputFormat.SCAN_COLUMNS, "raw:sample")
conf.set("hbase.client.keyvalue.maxsize","0")
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
var arrRdd:RDD[Map[String,Object]] = hBaseRDD.map(tuple => tuple._2).map(...)

现在我通过先获取 id 列表然后迭代 id 列表以通过 Spark foreach 中的纯 Hbase java 客户端获取列 raw:sample 来解决此问题。 请有任何想法,为什么我无法通过 Spark 获得所有列 raw:sample,是因为该列太大了吗?

前几天我的一个zookeeper节点和datanodes宕机了,但是我很快就修好了,因为replica是3,是这个原因吗?想如果我运行 hbck -repair 会有所帮助,非常感谢!

【问题讨论】:

    标签: hadoop apache-spark mapreduce hbase


    【解决方案1】:

    在内部,TableInputFormat 创建一个 Scan 对象,以便从 HBase 检索数据。

    尝试create a Scan object(不使用Spark),配置为从HBase检索相同的列,看看是否重复错误:

    // Instantiating Configuration class
      Configuration config = HBaseConfiguration.create();
    
      // Instantiating HTable class
      HTable table = new HTable(config, "emp");
    
      // Instantiating the Scan class
      Scan scan = new Scan();
    
      // Scanning the required columns
      scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"));
      scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("city"));
    
      // Getting the scan result
      ResultScanner scanner = table.getScanner(scan);
    
      // Reading values from scan result
      for (Result result = scanner.next(); result != null; result = scanner.next())
    
      System.out.println("Found row : " + result);
      //closing the scanner
      scanner.close();
    

    此外,默认情况下,TableInputFormat 被配置为从 HBase 服务器请求非常小的数据块(这很糟糕并且会导致很大的开销)。将以下内容设置为increase the chunk size

    scan.setBlockCache(false);
    scan.setCaching(2000);
    

    【讨论】:

    • 我尝试了所有方法,但仍然无法通过扫描检索所有行。
    • 哪些行没有返回?最后一个?还是它错过了中间的行?
    【解决方案2】:

    对于像您这样的高吞吐量,Apache Kafka 是集成数据流和保持数据管道活跃的最佳解决方案。 kafka的一些用例请参考http://kafka.apache.org/08/uses.html

    还有一个 http://sites.computer.org/debull/A12june/pipeline.pdf

    【讨论】:

    • 数据已经在Hbase,你的意思是我最好从Hbase读取到Kafka,然后从Kafka消费?我认为这是多余的,没有必要使用 Kafka 作为 Hbase 和 Spark/MR 的桥梁。这只是一个使用 Spark 从 Hbase 读取数据的批处理作业。
    • Kafka 不是必须的,但它会确保大数据管道和数据流不会中断。对于 3000 万条记录,Kafka 让您的系统更加稳定。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-05-20
    • 1970-01-01
    • 2017-08-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多