【问题标题】:Create RDD based on a part of HBase rows基于部分 HBase 行创建 RDD
【发布时间】:2016-10-29 19:36:33
【问题描述】:

我正在尝试根据HBase 表中的数据创建RDD

val targetRDD = sparkContext.newAPIHadoopRDD(hBaseConfig,
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
  .map {
    case (key, row) => parse(key, row)
  }

parse 为每个表行调用,而不考虑对数据的进一步操作。

是否可以只检索具有匹配某些条件的特定键的行(即键在某个特定范围内)以便仅对它们进行操作?

【问题讨论】:

标签: hadoop apache-spark hbase


【解决方案1】:

HBase 是一个键/值存储,行按键排序,这意味着:

  • 在按键检索单行或按键范围检索行序列时非常有效
  • 在某些条件下检索随机行效率不高

所有检索操作都归结为两类:GetScan。这不难猜出它们的作用,扫描将遍历所有行,除非您指定 stopRow/startRow。您也可以在 Scan 上设置过滤器,但它仍然需要迭代所有行,过滤器可以降低网络压力,因为 HBase 可能需要返回更少的行。

TableInputFormat 在您的示例中使用 Scan 内部来访问 Hbase:

  public void setConf(Configuration configuration) {
    this.conf = configuration;

    Scan scan = null;

    if (conf.get(SCAN) != null) {
      try {
        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
      } catch (IOException e) {
        LOG.error("An error occurred.", e);
      }
    } else {
      try {
        scan = createScanFromConfiguration(conf);
      } catch (Exception e) {
          LOG.error(StringUtils.stringifyException(e));
      }
    }

    setScan(scan);
  }

TableInputFormat 中的 createScanFromConfiguration 方法也可以提示您如何设置过滤器和键范围:

  public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
    Scan scan = new Scan();

    if (conf.get(SCAN_ROW_START) != null) {
      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
    }

    if (conf.get(SCAN_ROW_STOP) != null) {
      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
    }

    if (conf.get(SCAN_COLUMNS) != null) {
      addColumns(scan, conf.get(SCAN_COLUMNS));
    }

    if (conf.get(SCAN_COLUMN_FAMILY) != null) {
      scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
    }

    if (conf.get(SCAN_TIMESTAMP) != null) {
      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
    }

    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
      scan.setTimeRange(
          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
    }

    if (conf.get(SCAN_MAXVERSIONS) != null) {
      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
    }

    if (conf.get(SCAN_CACHEDROWS) != null) {
      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
    }

    if (conf.get(SCAN_BATCHSIZE) != null) {
      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
    }

    // false by default, full table scans generate too much BC churn
    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));

    return scan;
  }

这个 stackoverflow answer 提供了一个如何在 hbaseConfig 上设置 Scan 的示例,请注意,虽然您不必设置 Scan,但您可以只设置特定属性,如 SCAN_ROW_START 和其他来自 createScanFromConfiguration 我上面提到的.

【讨论】:

  • 谢谢,我试试看。
  • 设置SCAN_ROW_START 的方式适合我。请让答案更简洁。
猜你喜欢
  • 2012-08-02
  • 2017-07-09
  • 1970-01-01
  • 2015-01-30
  • 1970-01-01
  • 1970-01-01
  • 2019-08-11
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多