创建的数据库存储如下数据
表结构
java代码
1 2 public class HbaseTest { 3 4 /** 5 * 配置ss 6 */ 7 static Configuration config = null; 8 private Connection connection = null; 9 private Table table = null; 10 11 @Before 12 public void init() throws Exception { 13 config = HBaseConfiguration.create();// 配置 14 config.set("hbase.zookeeper.quorum", "192.168.33.61");// zookeeper地址 15 config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口 16 connection = ConnectionFactory.createConnection(config); 17 table = connection.getTable(TableName.valueOf("dept")); 18 } 19 20 /** 21 * 创建数据库表dept,并增加列族info和subdept 22 * 23 * @throws Exception 24 */ 25 @Test 26 public void createTable() throws Exception { 27 // 创建表管理类 28 HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理 29 // 创建表描述类 30 TableName tableName = TableName.valueOf("dept"); // 表名称 31 HTableDescriptor desc = new HTableDescriptor(tableName); 32 // 创建列族的描述类 33 HColumnDescriptor family = new HColumnDescriptor("info"); // 列族 34 // 将列族添加到表中 35 desc.addFamily(family); 36 HColumnDescriptor family2 = new HColumnDescriptor("subdept"); // 列族 37 // 将列族添加到表中 38 desc.addFamily(family2); 39 // 创建表 40 admin.createTable(desc); // 创建表 41 System.out.println("创建表成功!"); 42 } 43 44 /** 45 * 向hbase中插入前三行网络部、开发部、测试部的相关数据, 46 * 即加入表中的前三条数据 47 * 48 * @throws Exception 49 */ 50 @SuppressWarnings({ "deprecation", "resource" }) 51 @Test 52 public void insertData() throws Exception { 53 table.setAutoFlushTo(false); 54 table.setWriteBufferSize(534534534); 55 ArrayList<Put> arrayList = new ArrayList<Put>(); 56 57 Put put = new Put(Bytes.toBytes("0_1")); 58 put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("网络部")); 59 put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept1"), Bytes.toBytes("1_1")); 60 put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept2"), Bytes.toBytes("1_2")); 61 arrayList.add(put); 62 63 Put put1 = new Put(Bytes.toBytes("1_1")); 64 put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("开发部")); 65 put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1")); 66 67 Put put2 = new Put(Bytes.toBytes("1_2")); 68 put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("测试部")); 69 put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1")); 70 71 for (int i = 1; i <= 100; i++) { 72 73 put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("2_"+i)); 74 put2.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("3_"+i)); 75 } 76 arrayList.add(put1); 77 arrayList.add(put2); 78 //插入数据 79 table.put(arrayList); 80 //提交 81 table.flushCommits(); 82 System.out.println("数据插入成功!"); 83 } 84 85 /** 86 * 向hbase中插入开发部、测试部下的所有子部门数据 87 * @throws Exception 88 */ 89 @Test 90 public void insertOtherData() throws Exception { 91 table.setAutoFlushTo(false); 92 table.setWriteBufferSize(534534534); 93 ArrayList<Put> arrayList = new ArrayList<Put>(); 94 for (int i = 1; i <= 100; i++) { 95 Put put_development = new Put(Bytes.toBytes("2_"+i)); 96 put_development.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("开发"+i+"组")); 97 put_development.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_1")); 98 arrayList.add(put_development); 99 100 Put put_test = new Put(Bytes.toBytes("3_"+i)); 101 put_test.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("测试"+i+"组")); 102 put_test.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_2")); 103 arrayList.add(put_test); 104 } 105 106 //插入数据 107 table.put(arrayList); 108 //提交 109 table.flushCommits(); 110 System.out.println("插入其他数据成功!"); 111 } 112 113 /** 114 * 查询所有一级部门(没有上级部门的部门) 115 * @throws Exception 116 */ 117 @Test 118 public void scanDataStep1() throws Exception { 119 120 // 创建全表扫描的scan 121 Scan scan = new Scan(); 122 System.out.println("查询到的所有一级部门如下:"); 123 // 打印结果集 124 ResultScanner scanner = table.getScanner(scan); 125 for (Result result : scanner) { 126 if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("f_pid")) == null) { 127 for (KeyValue kv : result.raw()) { 128 System.out.print(new String(kv.getRow()) + " "); 129 System.out.print(new String(kv.getFamily()) + ":"); 130 System.out.print(new String(kv.getQualifier()) + " = "); 131 System.out.print(new String(kv.getValue())); 132 System.out.print(" timestamp = " + kv.getTimestamp() + "\n"); 133 } 134 } 135 } 136 } 137 138 /** 139 * 已知rowkey,查询该部门的所有(直接)子部门信息 rowkey=1_1 140 * @throws Exception 141 */ 142 @Test 143 public void scanDataStep2() throws Exception { 144 Get g = new Get("1_1".getBytes()); 145 g.addFamily("subdept".getBytes()); 146 // 打印结果集 147 Result result = table.get(g); 148 for (KeyValue kv : result.raw()) { 149 Get g1 = new Get(kv.getValue()); 150 Result result1 = table.get(g1); 151 for (KeyValue kv1 : result1.raw()) { 152 System.out.print(new String(kv1.getRow()) + " "); 153 System.out.print(new String(kv1.getFamily()) + ":"); 154 System.out.print(new String(kv1.getQualifier()) + " = "); 155 System.out.print(new String(kv1.getValue())); 156 System.out.print(" timestamp = " + kv1.getTimestamp() + "\n"); 157 } 158 } 159 } 160 161 /** 162 * 已知rowkey,向该部门增加一个子部门 163 * rowkey:0_1 164 * 增加的部门名:我增加的部门 165 * @throws Exception 166 */ 167 @Test 168 public void scanDataStep3() throws Exception { 169 //新增一个部门 170 Put put = new Put(Bytes.toBytes("4_1")); 171 put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("我增加的部门")); 172 put.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1")); 173 //插入数据 174 table.put(put); 175 //提交 176 table.flushCommits(); 177 178 //更新网络部 179 Put put1 = new Put(Bytes.toBytes("0_1")); 180 put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept3"), Bytes.toBytes("4_1")); 181 //插入数据 182 table.put(put1); 183 //提交 184 table.flushCommits(); 185 } 186 187 /** 188 * 已知rowkey(且该部门存在子部门),删除该部门信息,该部门所有(直接)子部门被调整到其他部门中 189 * @throws Exception 190 */ 191 @Test 192 public void scanDataStep4() throws Exception { 193 /** 194 * 向部门"我增加的部门"添加两个子部门" 195 */ 196 table.setAutoFlushTo(false); 197 table.setWriteBufferSize(534534534); 198 ArrayList<Put> arrayList = new ArrayList<Put>(); 199 Put put1 = new Put(Bytes.toBytes("5_1")); 200 put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部门1")); 201 put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1")); 202 Put put2 = new Put(Bytes.toBytes("5_2")); 203 put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部门2")); 204 put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1")); 205 206 arrayList.add(put1); 207 arrayList.add(put2); 208 //插入数据 209 table.put(arrayList); 210 //提交 211 table.flushCommits(); 212 213 /** 214 * 目的:删除"我增加的部门"的部门信息,该部门所有(直接)子部门被调整到其他部门中 215 * 使用策略:更新部门名就可以了,也就是说一个部门可能有多个rowkey 216 */ 217 Put put = new Put(Bytes.toBytes("4_1")); 218 put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("开发部")); 219 //插入数据 220 table.put(put); 221 //提交 222 table.flushCommits(); 223 } 224 225 @After 226 public void close() throws Exception { 227 table.close(); 228 connection.close(); 229 } 230 231 }