Hbase 也是我们很常用的数据存储组件,所以提前尝试下用SQL 写Hbase,中间也遇到一些坑,跟大家分享一下。
官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#hbase-connector
--------------------------20200508--------------
新增: Flink 1.10 SQL 写 Hbase 数据无法写入hbase问题
---------------------------------------------------
HBase Connector 支持这些操作: Source: Batch Sink: Batch Sink: Streaming Append Mode Sink: Streaming Upsert Mode Temporal Join: Sync Mode
(选择性忽略Batch 的操作了,上次跟一个朋友说,HBase connector 只支持 sink 操作。)
HBase connector 可以在upsert模式下运行,以使用查询定义的密钥与外部系统交换UPSERT / DELETE消息。
对于 append-only 查询,connector 还可以在 append 模式下操作,仅与外部系统交换INSERT消息。
官网示例如下:
CREATE TABLE MyUserTable ( hbase_rowkey_name rowkey_type, hbase_column_family_name1 ROW<...>, hbase_column_family_name2 ROW<...> ) WITH ( \'connector.type\' = \'hbase\', -- required: specify this table type is hbase \'connector.version\' = \'1.4.3\', -- required: valid connector versions are "1.4.3" \'connector.table-name\' = \'hbase_table_name\', -- required: hbase table name \'connector.zookeeper.quorum\' = \'localhost:2181\', -- required: HBase Zookeeper quorum configuration \'connector.zookeeper.znode.parent\' = \'/test\', -- optional: the root dir in Zookeeper for HBase cluster. -- The default value is "/hbase". \'connector.write.buffer-flush.max-size\' = \'10mb\', -- optional: writing option, determines how many size in memory of buffered -- rows to insert per round trip. This can help performance on writing to JDBC -- database. The default value is "2mb". \'connector.write.buffer-flush.max-rows\' = \'1000\', -- optional: writing option, determines how many rows to insert per round trip. -- This can help performance on writing to JDBC database. No default value, -- i.e. the default flushing is not depends on the number of buffered rows. \'connector.write.buffer-flush.interval\' = \'2s\', -- optional: writing option, sets a flush interval flushing buffered requesting -- if the interval passes, in milliseconds. Default value is "0s", which means -- no asynchronous flush thread will be scheduled. )
Columns: HBase表中的 column families 必须声明为ROW类型,字段名称映射到column families 名称,而嵌套的字段名称映射到 column qualifier 名称。 无需在架构中声明所有 families 和 qualifiers ,用户可以声明必要的内容。 除ROW type字段外,原子类型的唯一 一个字段(例如STRING,BIGINT)将被识别为表的 rowkey。 row key 字段的名称没有任何限制。
Temporary join: 针对HBase的 Lookup join 不使用任何缓存; 始终总是通过HBase客户端直接查询数据。
之前一直看英文,上面的描述看得似是而非的,没能理解到,Flink 中 建HBase 表的 DDL 的规则,简单列下:
1、Flink HBase 表只能有一个原子类型的字段,就是 rowkey(习惯是放在第一个字段,名字随意)
2、Flink HBase 表的其他字段都是ROW 类型的,并且字段名与 HBase 表中 的 column family 名一样(如果只有一个列族,除了rowkey 就只有一个字段)
3、ROW 类型的字段嵌套的字段名称就是该列族下的列名
下面看个实例:
首先需要添加flink-hbase connector 对应的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
选择对应版本,Flink 版本是 1.10
必须要说下,现在 HBase 版本只支持 1.4.3,我的HBase 是2.1.4 的,懒得换了,直接修改Flink 代码,绕过版本验证(可以正常写数,没有经过版本匹配和严格的测试,可能会有未知的问题)
sql 文件如下:
-- 读 kafka write to json --sourceTable CREATE TABLE user_log( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3) ) WITH ( \'connector.type\' = \'kafka\', \'connector.version\' = \'universal\', \'connector.topic\' = \'user_behavior\', \'connector.properties.zookeeper.connect\' = \'venn:2181\', \'connector.properties.bootstrap.servers\' = \'venn:9092\', \'connector.startup-mode\' = \'earliest-offset\', \'format.type\' = \'json\' ); --sinkTable CREATE TABLE user_log_sink ( rowkey string, cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3)) ) WITH ( \'connector.type\' = \'hbase\', -- 目前只支持 1.4.3 , HBaseValidator.CONNECTOR_VERSION_VALUE_143 写死了 1.4.3, 改成 2.1.4 可以正常写数到 hbase -- 生产慎用 \'connector.version\' = \'2.1.4\', -- hbase vesion \'connector.table-name\' = \'venn\', -- hbase table name \'connector.zookeeper.quorum\' = \'venn:2181\', -- zookeeper quorum \'connector.zookeeper.znode.parent\' = \'/hbase\', -- hbase znode in zookeeper \'connector.write.buffer-flush.max-size\' = \'10mb\', -- max flush size \'connector.write.buffer-flush.max-rows\' = \'1000\', -- max flush rows \'connector.write.buffer-flush.interval\' = \'2s\' -- max flush interval ); --insert INSERT INTO user_log_sink SELECT user_id, ROW(item_id, category_id, behavior, ts ) as cf FROM user_log;
简单描述下 sink 表:
有个string类型的rowkey, 还有一个 列 cf (HBase 表的列族),cf 下面有 item_id/category_id/behavior/ts 4个列
执行的sql 就很简单了,从Kafka 的 source 表读数据,写到 sink 表。
查看写入到 HBase 中的数据:
hbase(main):001:0> count \'venn\' Current count: 1000, row: 561558 1893 row(s) Took 1.8662 seconds => 1893 hbase(main):002:0> scan \'venn\',{LIMIT=>1} ROW COLUMN+CELL 1000034 column=cf:behavior, timestamp=1584615437261, value=pv 1000034 column=cf:category_id, timestamp=1584615437261, value=982926 1000034 column=cf:item_id, timestamp=1584615437261, value=800784 1000034 column=cf:ts, timestamp=1584615437261, value=\x00\x00\x01_\xF4\x1F`\xD0 1 row(s) Took 0.0834 seconds
------------20200427 改-----------
根本不用改源码,直接将 sql properties 写成 \'1.4.3\' 就可以了,执行的时候,不会去校验hbase 的版本
----------------------------------------
搞定
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文