在KUDU之前,大数据主要以两种方式存储;
(1)静态数据:
以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。这类存储的局限性是数据无法进行随机的读写。
(2)动态数据:
以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。这类存储的局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。
从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求。
数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景。但是缺点也比较明显:
(1)架构复杂。从架构上看,数据在HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。
(2)时效性低。数据从HBase导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。
(3)难以应对后续的更新。真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从HBase导出到HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。
为了解决上述架构的这些问题,KUDU应运而生。KUDU的定位是Fast Analytics on Fast Data,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎。
二、kudu基础
2.1 使用场景
适用于那些既有随机访问,也有批量数据扫描的复合场景、高计算量的场景、使用了高性能的存储设备,包括使用更多的内存、支持数据更新,避免数据反复迁移、支持跨地域的实时数据备份和查询。
2.2 kudu架构
与HDFS和HBase相似,Kudu使用单个的Master节点,用来管理集群的元数据,并且使用任意数量的Tablet Server(可对比理解HBase中的RegionServer角色)节点用来存储实际数据。可以部署多个Master节点来提高容错性。一个table表的数据,被分割成1个或多个Tablet,Tablet被部署在Tablet Server来提供数据读写服务。
一些基本概念:
Master:集群中的老大,负责集群管理、元数据管理等功能
Tablet Server: 集群中的小弟,负责数据存储,并提供数据读写服务。一个 tablet server 存储了table表的tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。
Table(表):一张table是数据存储在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主键)。table 被分成称为 tablets 的 segments。
Tablet:一个 tablet 是一张 table连续的segment,tablet是kudu表的水平分区,类似于google Bigtable的tablet,或者HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。
三、kudu分区
为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。行总是属于单个tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。 kudu提供了3种分区方式。
3.1 Range Partitioning ( 范围分区 )
范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.
1 /** 2 3 * 测试分区: 4 5 * RangePartition 6 7 */ 8 9 @Test 10 11 public void testRangePartition() throws KuduException { 12 13 //设置表的schema 14 15 LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); 16 17 columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); 18 19 columnSchemas.add(newColumn("WorkId", Type.INT32,false)); 20 21 columnSchemas.add(newColumn("Name", Type.STRING,false)); 22 23 columnSchemas.add(newColumn("Gender", Type.STRING,false)); 24 25 columnSchemas.add(newColumn("Photo", Type.STRING,false)); 26 27 28 29 //创建schema 30 31 Schema schema = new Schema(columnSchemas); 32 33 34 35 //创建表时提供的所有选项 36 37 CreateTableOptions tableOptions = new CreateTableOptions(); 38 39 //设置副本数 40 41 tableOptions.setNumReplicas(1); 42 43 //设置范围分区的规则 44 45 LinkedList<String> parcols = new LinkedList<String>(); 46 47 parcols.add("CompanyId"); 48 49 //设置按照那个字段进行range分区 50 51 tableOptions.setRangePartitionColumns(parcols); 52 53 54 55 /** 56 57 * range 58 59 * 0 < value < 10 60 61 * 10 <= value < 20 62 63 * 20 <= value < 30 64 65 * ........ 66 67 * 80 <= value < 90 68 69 * */ 70 71 int count=0; 72 73 for(int i =0;i<10;i++){ 74 75 //范围开始 76 77 PartialRow lower = schema.newPartialRow(); 78 79 lower.addInt("CompanyId",count); 80 81 82 83 //范围结束 84 85 PartialRow upper = schema.newPartialRow(); 86 87 count +=10; 88 89 upper.addInt("CompanyId",count); 90 91 92 93 //设置每一个分区的范围 94 95 tableOptions.addRangePartition(lower,upper); 96 97 } 98 99 100 101 try { 102 103 kuduClient.createTable("student",schema,tableOptions); 104 105 } catch (KuduException e) { 106 107 e.printStackTrace(); 108 109 } 110 111 kuduClient.close(); 112 113 }