lookup join mysql demo:  flink lookup join mysql demo

## join rowkey

-- Lookup Source
-- kafka source
CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3)
,process_time as proctime()
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_behavior'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'group-offsets'
,'format' = 'json'
);

drop table if exists hbase_behavior_conf ;
CREATE TEMPORARY TABLE hbase_behavior_conf (
rowkey STRING
,cf ROW(item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3))
) WITH (
'connector' = 'hbase-2.2'
,'zookeeper.quorum' = 'thinkpad:12181'
,'table-name' = 'user_log'
,'lookup.cache.max-rows' = '10000'
,'lookup.cache.ttl' = '10 second'
,'lookup.async' = 'true'
);

---sinkTable
CREATE TABLE kakfa_join_mysql_demo (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,behavior_map STRING
,ts TIMESTAMP(3)
-- ,primary key (user_id) not enforced
) WITH (
'connector' = 'kafka'
,'topic' = 'user_behavior_1'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'group-offsets'
,'format' = 'json'
);

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.user_id = rowkey
where a.behavior is not null;

 

测试 hbase 维表Lookup 功能正常,可以正常缓存数据,缓存也会定时失效,透查Hbase

 

flink sql join hbase demo
    




flink lookup join mysql demo


* 注: 随便测试了一下性能,Hbase 维表有2 万多条数据,输入数据的关联字段都是Hbase 表主键,lookup.cache.ttl 为 1分钟,关联的 TPS 轻松到达: 2W

flink sql join hbase demo
    




flink lookup join mysql demo

## join 非 rowkey

 

join 非主键时,hbase 维表启动时一次性读取 hbase 表全部数据,缓存到内存中,hbase source 状态 finish

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf c ON a.item_id = cf.item_id
where a.behavior is not null;

 

 

flink sql join hbase demo
    




flink lookup join mysql demo


* 注: 这种情况在 flink 1.13 版本,不能完成 checkpoint

2021-08-20 09:12:39,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, hbase_behavior_conf, project=[cf]]], fields=[cf]) -> Calc(select=[cf, cf.item_id AS $f2]) (1/1) (55be6048b81e2eef95f25d78b3705a34) switched from RUNNING to FINISHED.
2021-08-20 09:13:30,356 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job 971bbb75c7334b0c2b9d218005efbf00 since some tasks of job 971bbb75c7334b0c2b9d218005efbf00 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.

 

Hbase 关联只有在关联键是Hbase 表的主键的时候,才能应用 Lookup 功能,非主键一次性加载,维表数据没办法更新,而且不能做 Checkpoint 影响 Flink job 的一致性。

在之前版本,SQL 功能不完善的时候,我们使用 UDF 的方式关联 Hbase,可以在 UDF 里面自己关联缓存、透查Hbase,也比较灵活。(没找到之前的代码,空了自己写一个,可以再水一篇博客)

 

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

flink sql join hbase demo
    




flink lookup join mysql demo

 

相关文章:

  • 2022-01-28
  • 2022-12-23
  • 2021-10-31
  • 2021-05-22
  • 2021-05-21
  • 2022-12-23
  • 2022-12-23
  • 2021-07-30
猜你喜欢
  • 2022-02-19
  • 2021-11-23
  • 2021-07-31
  • 2021-06-28
  • 2021-07-23
  • 2022-12-23
  • 2021-08-31
相关资源
相似解决方案