【问题标题】:Delete hbase cell using spark使用 spark 删除 hbase 单元格
【发布时间】:2016-03-23 16:32:07
【问题描述】:

是否有任何 api 可用于使用 Spark Scala 删除特定的 HBase 单元。我们能够使用 Spark-HBase 连接器进行读写。任何关于删除单元格的建议都是非常值得赞赏的。

【问题讨论】:

    标签: apache-spark hbase


    【解决方案1】:

    这是一个使用 Spark 删除 HBase Cell 对象的实现(我使用 parallelize 进行了演示,您可以将其调整为您的 Cells RDD)。

    总体思路:分块删除 - 遍历每个 RDD 分区,将分区拆分为 10,000 个 Cell 的块,将每个 Cell 转换为 HBase Delete 对象,然后调用 table.delete() 从 HBase 执行删除。

    public void deleteCells(List<Cell> cellsToDelete) {
    
        JavaSparkContext sc = new JavaSparkContext();
    
        sc.parallelize(cellsToDelete)
            .foreachPartition(cellsIterator -> {
                int chunkSize = 100000; // Will contact HBase only once per 100,000 records
    
                org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
                config.set("hbase.zookeeper.quorum", "YOUR-ZOOKEEPER-HOSTNAME");
    
                Table table;
    
                try {
                    Connection connection = ConnectionFactory.createConnection(config);
                    table = connection.getTable(TableName.valueOf(config.get("YOUR-HBASE-TABLE")));
                }
                catch (IOException e)
                {
                    logger.error("Failed to connect to HBase due to inner exception: " + e);
    
                    return;
                }
    
                // Split the given cells iterator to chunks
                Iterators.partition(cellsIterator, chunkSize)
                    .forEachRemaining(cellsChunk -> {
                        List<Delete> deletions = Lists.newArrayList(cellsChunk
                                .stream()
                                .map(cell -> new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
                                        .addColumn(cell.getFamily(), cell.getQualifier(), System.currentTimeMillis()))
                                .iterator());
    
                        try {
                            table.delete(deletions);
                        } catch (IOException e) {
                            logger.error("Failed to delete a chunk due to inner exception: " + e);
                        }
                    });
    
            });
    }
    

    免责声明:这个确切的 sn-p 未经测试,但我使用相同的方法使用 Spark 删除了数十亿个 HBase 单元。

    【讨论】:

    • 谢谢!!我会在 scala 中尝试同样的方法。
    • 抱歉花了这么长时间来尝试它。我在 scala 中尝试了以下代码,它执行时没有任何错误,但是没有删除任何数据。我想知道我错过了什么。 import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; val conf = HBaseConfiguration.create(); val table = new HTable(conf, "mytable"); val delete = new Delete(Bytes.toBytes(1)); delete.deleteColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name")); table.delete(delete);
    • 尝试执行 Get 操作,看看是否可以从 HBase 检索单元格。
    • Get 操作正常,我尝试了下面的代码。 val g = new Get(Bytes.toBytes("1")); val result = table.get(g); val value = result.getValue(Bytes.toBytes("mycf"),Bytes.toBytes("name")); val name = Bytes.toString(value); 我表中的数据如下 hbase(main):001:0> scan 'mytable' ROW COLUMN+CELL 1 column=mycf:name, timestamp=1460540729352, value=Name1 1 column=mycf:prg, timestamp =1460540729352,值=1
    • 好。您在执行删除时是否提供了时间戳?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多