【发布时间】:2012-07-07 16:10:08
【问题描述】:
现在我像这样在ResultScanner 上实现行计数
for (Result rs = scanner.next(); rs != null; rs = scanner.next()) {
number++;
}
如果数据达到数百万次计算量很大。我想实时计算我不想使用 Mapreduce
如何快速计算行数。
【问题讨论】:
现在我像这样在ResultScanner 上实现行计数
for (Result rs = scanner.next(); rs != null; rs = scanner.next()) {
number++;
}
如果数据达到数百万次计算量很大。我想实时计算我不想使用 Mapreduce
如何快速计算行数。
【问题讨论】:
使用 HBase 中包含的 HBase rowcount map/reduce job
【讨论】:
job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue();
您可以使用自 HBase 0.92 以来可用的协处理器。见Coprocessor和AggregateProtocol和example
【讨论】:
在 HBASE 中计算行数的简单、有效和高效的方法:
每当您插入一行时,都会触发此 API,该 API 会增加该特定单元格。
Htable.incrementColumnValue(Bytes.toBytes("count"), Bytes.toBytes("details"), Bytes.toBytes("count"), 1);
检查该表中存在的行数。只需对特定的“计数”行使用“获取”或“扫描”API。
通过使用此方法,您可以在不到一毫秒的时间内获得行数。
【讨论】:
如果您使用的是扫描仪,请在您的扫描仪中尝试让它返回尽可能少的限定符。实际上,您返回的限定符应该是可用的最小的(以字节为单位)。这将大大加快您的扫描速度。
不幸的是,这只会扩大到目前(数百万?)。更进一步,您可以实时执行此操作,但您首先需要运行 mapreduce 作业来计算所有行数。
将 Mapreduce 输出存储在 HBase 的一个单元中。每次添加一行,计数器加 1。每次删除一行,计数器减一。
当您需要实时访问行数时,您在 HBase 中读取该字段。
没有快速的方法来以可扩展的方式计算行数。你只能数这么快。
【讨论】:
FirstKeyOnlyFilter,而不是“尽可能返回最少数量的限定符”。作为扫描过滤器
FirstKeyOnlyFilter 到底是做什么的?从 [thrift docs](,我无法理解这个解释:[FirstKeyOnlyFilter] returns only the first key-value from each row -- 这是否意味着它只选择第一个单元格并返回那个?
FirstKeyOnlyFilter 选择的第一个键值具有非常大的值,那么这会减慢扫描速度。另一方面,如果您选择具有最小值的限定符,但该限定符并未出现在您要计数的所有行中,那么您将得到不准确的计数。
FirstKeyOnlyFilter() AND KeyOnlyFilter()。 KeyOnlyFilter 将阻止列值通过网络传输。
在 HBase 中使用 RowCounter RowCounter 是一个 mapreduce 作业,用于计算表的所有行。这是一个很好的实用程序,可用作完整性检查,以确保 HBase 在存在元数据不一致问题时可以读取表的所有块。它将在单个进程中运行所有 mapreduce,但如果您有一个 MapReduce 集群可供它利用,它将运行得更快。
$ hbase org.apache.hadoop.hbase.mapreduce.RowCounter <tablename>
Usage: RowCounter [options]
<tablename> [
--starttime=[start]
--endtime=[end]
[--range=[startKey],[endKey]]
[<column1> <column2>...]
]
【讨论】:
org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters ROWS=55438
可以使用hbase中的count方法来统计行数。但是,是的,计算大表的行数可能会很慢。count 'tablename' [interval]
返回值为行数。
此操作可能需要很长时间(运行'$HADOOP_HOME/bin/hadoop jar hbase.jar rowcount' 来运行计数 mapreduce 作业)。显示当前计数 默认情况下每 1000 行。可以选择指定计数间隔。扫描 默认情况下,在计数扫描时启用缓存。默认缓存大小为 10 行。 如果您的行很小,您可能需要增加这个 参数。
例子:
hbase> count 't1'
hbase> count 't1', INTERVAL => 100000
hbase> count 't1', CACHE => 1000
hbase> count 't1', INTERVAL => 10, CACHE => 1000
同样的命令也可以在表引用上运行。假设您引用了表 't1',相应的命令将是:
hbase> t.count
hbase> t.count INTERVAL => 100000
hbase> t.count CACHE => 1000
hbase> t.count INTERVAL => 10, CACHE => 1000
【讨论】:
你可以试试 hbase api 方法!
org.apache.hadoop.hbase.client.coprocessor.AggregationClient
【讨论】:
你可以在这里找到示例:
/**
* Used to get the number of rows of the table
* @param tableName
* @param familyNames
* @return the number of rows
* @throws IOException
*/
public long countRows(String tableName, String... familyNames) throws IOException {
long rowCount = 0;
Configuration configuration = connection.getConfiguration();
// Increase RPC timeout, in case of a slow computation
configuration.setLong("hbase.rpc.timeout", 600000);
// Default is 1, set to a higher value for faster scanner.next(..)
configuration.setLong("hbase.client.scanner.caching", 1000);
AggregationClient aggregationClient = new AggregationClient(configuration);
try {
Scan scan = new Scan();
if (familyNames != null && familyNames.length > 0) {
for (String familyName : familyNames) {
scan.addFamily(Bytes.toBytes(familyName));
}
}
rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
} catch (Throwable e) {
throw new IOException(e);
}
return rowCount;
}
【讨论】:
configuration.setLong("hbase.client.scanner.caching", 1000); 有效?例如,如果我设置它,然后调用scanner.getCaching(),它将返回-1。
如果您出于某种原因不能使用RowCounter,那么这两个过滤器的组合应该是获得计数的最佳方式:
FirstKeyOnlyFilter() AND KeyOnlyFilter()
FirstKeyOnlyFilter 将导致扫描器仅返回它找到的第一个列限定符,而不是扫描器返回表中的所有列限定符,这将最小化网络带宽。简单地选择一个列限定符返回怎么样?如果您可以保证每一行都存在列限定符,这将起作用,但如果这不是真的,那么您将得到一个不准确的计数。
KeyOnlyFilter 将导致扫描器仅返回列族,并且不会为列限定符返回任何值。这进一步减少了网络带宽,在一般情况下不会造成太大的减少,但可能存在边缘情况,即前一个过滤器选择的第一列恰好是一个非常大的值。
我尝试使用scan.setCaching,但结果到处都是。也许它会有所帮助。
我在开始和停止之间有 1600 万行,我做了以下伪经验测试:
激活 FirstKeyOnlyFilter 和 KeyOnlyFilter : 未设置缓存(即默认值)时,需要 188 秒。 缓存设置为 1,耗时 188 秒 缓存设置为 10,需要 200 秒 缓存设置为 100,耗时 187 秒 缓存设置为 1000,耗时 183 秒。 缓存设置为 10000,耗时 199 秒。 缓存设置为 100000,耗时 199 秒。 禁用 FirstKeyOnlyFilter 和 KeyOnlyFilter: 未设置缓存(即默认值),耗时 309 秒我没有费心对此进行适当的测试,但很明显FirstKeyOnlyFilter 和KeyOnlyFilter 是好的。
此外,这个特定表格中的单元格非常小 - 所以我认为过滤器在不同的表格上会更好。
这是一个 Java 代码示例:
导入 java.io.IOException; 导入 org.apache.hadoop.conf.Configuration; 导入 org.apache.hadoop.hbase.HBaseConfiguration; 导入 org.apache.hadoop.hbase.client.HTable; 导入 org.apache.hadoop.hbase.client.Result; 导入 org.apache.hadoop.hbase.client.ResultScanner; 导入 org.apache.hadoop.hbase.client.Scan; 导入 org.apache.hadoop.hbase.util.Bytes; 导入 org.apache.hadoop.hbase.filter.RowFilter; 导入 org.apache.hadoop.hbase.filter.KeyOnlyFilter; 导入 org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 导入 org.apache.hadoop.hbase.filter.FilterList; 导入 org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 导入 org.apache.hadoop.hbase.filter.RegexStringComparator; 公共类 HBaseCount { 公共静态 void main(String[] args) 抛出 IOException { 配置配置 = HBaseConfiguration.create(); HTable 表 = 新 HTable(config, "my_table"); 扫描扫描=新扫描( Bytes.toBytes("foo"), Bytes.toBytes("foo~") ); 如果(args.length == 1){ scan.setCaching(Integer.valueOf(args[0])); } System.out.println("scan的缓存是" + scan.getCaching()); FilterList allFilters = new FilterList(); allFilters.addFilter(new FirstKeyOnlyFilter()); allFilters.addFilter(new KeyOnlyFilter()); scan.setFilter(allFilters); ResultScanner 扫描仪 = table.getScanner(scan); 整数计数 = 0; 长开始 = System.currentTimeMillis(); 尝试 { for (结果 rr =scanner.next(); rr != null; rr =scanner.next()) { 计数 += 1; if (count % 100000 == 0) System.out.println(count); } } 最后 { 扫描仪.close(); } 长端 = System.currentTimeMillis(); long elapsedTime = 结束 - 开始; System.out.println("经过的时间是 " + (elapsedTime/1000F)); } }这是一个pychbase 代码示例:
从 pychbase 导入连接 c = 连接() t = c.table('my_table') # 在后台这应用了 FirstKeyOnlyFilter 和 KeyOnlyFilter # 类似于下面的happybase示例 打印 t.count(row_prefix="foo")这是一个 Happybase 代码示例:
从happybase导入连接 c = 连接(...) t = c.table('my_table') 计数 = 0 对于 _ in t.scan(filter='FirstKeyOnlyFilter() AND KeyOnlyFilter()'): 计数 += 1 打印计数感谢@Tuckr and @KennyCason 的提示。
【讨论】:
转到Hbase主目录并运行此命令,
./bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'namespace:tablename'
这将启动一个 mapreduce 作业,输出将显示 hbase 表中存在的记录数。
【讨论】:
要计算正确 YARN 集群上的 Hbase 表记录数,您还必须设置 map reduce 作业队列名称:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter -Dmapreduce.job.queuename= < Your Q Name which you have SUBMIT access>
< TABLE_NAME>
【讨论】:
两种方法帮助我快速从 hbase 表中获取行数
场景 #1
如果 hbase 表很小,则使用有效用户登录到 hbase shell 并执行
>count '<tablename>'
例子
>count 'employee'
6 row(s) in 0.1110 seconds
场景 #2
如果 hbase 表很大,则执行内置的 RowCounter map reduce 作业: 使用有效用户登录hadoop机器并执行:
/$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter '<tablename>'
例子:
/$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'employee'
....
....
....
Virtual memory (bytes) snapshot=22594633728
Total committed heap usage (bytes)=5093457920
org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters
ROWS=6
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
【讨论】: