这是基于以下假设的解决方案:
- 源 oracle 表本身不负责 SCD2
处理(因此 Eff/Exp TS 列不会出现在那个
表)。
- 有一个外部进程只提取/加载增量
(新的、更新的)记录到雪花中。
- 源 oracle 没有删除记录
首先创建表并添加第一组增量数据:
CREATE OR REPLACE TABLE stg.cdc2_oracle_d (
key1 varchar(10),
key2 varchar(10),
key3 varchar(10),
attr1 varchar(8),
attr2 varchar(8));
CREATE OR REPLACE TABLE edw.cdc2_snowflake_d (
key1 varchar(10),
key2 varchar(10),
key3 varchar(10),
attr1 varchar(8),
attr2 varchar(8),
eff_ts TIMESTAMP_LTZ(0),
exp_ts TIMESTAMP_LTZ(0),
active_fl char(1));
INSERT INTO stg.cdc2_oracle_d VALUES
( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.0'),
( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'),
( 'PT_3', 'DL_3', 'RPT_3', 'Addr3a', 'APT_3.0');
然后运行以下转换脚本:
BEGIN;
-- 1: insert new-new records from stg table that don't current exist in the edw table
INSERT INTO edw.cdc2_snowflake_d
SELECT
key1,
key2,
key3,
attr1,
attr2,
CURRENT_TIMESTAMP(0) AS eff_ts,
CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
'Y' AS active_fl
FROM stg.cdc2_oracle_d stg
WHERE NOT EXISTS (
SELECT 1
FROM edw.cdc2_snowflake_d edw
WHERE edw.key1 = stg.key1
AND edw.key2 = stg.key2
AND edw.key3 = stg.key3
AND edw.active_fl = 'Y');
-- 2: insert new version of record from stg table where key current does exist in edw table
-- but only add if the attr columns are different, otherwise it's the same record
INSERT INTO edw.cdc2_snowflake_d
SELECT
stg.key1,
stg.key2,
stg.key3,
stg.attr1,
stg.attr2,
CURRENT_TIMESTAMP(0) AS eff_ts,
CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
'T' AS active_fl -- set flat to Temporary setting
FROM stg.cdc2_oracle_d stg
JOIN edw.cdc2_snowflake_d edw ON edw.key1 = stg.key1 AND edw.key2 = stg.key2
AND edw.key3 = stg.key3 AND edw.active_fl = 'Y'
WHERE (stg.attr1 <> edw.attr1
OR stg.attr2 <> edw.attr2);
-- 3: deactive the current record where there is a new record from above step
-- and set the end_ts to 1 second prior to new record so there is no overlap in data
UPDATE edw.cdc2_snowflake_d old
SET old.active_fl = 'N',
old.exp_ts = DATEADD(SECOND, -1, new.eff_ts)
FROM edw.cdc2_snowflake_d new
WHERE old.key1 = new.key1
AND old.key2 = new.key2
AND old.key3 = new.key3
AND new.active_fl = 'T'
AND old.active_fl = 'Y';
-- 4: finally set all the temporary records to active
UPDATE cdc2_snowflake_d tmp
SET tmp.active_fl = 'Y'
WHERE tmp.active_fl = 'T';
COMMIT;
查看结果,然后截断并添加新数据并再次运行脚本:
SELECT * FROM stg.cdc2_oracle_d;
SELECT * FROM edw.cdc2_snowflake_d ORDER BY 1,2,3,5;
TRUNCATE TABLE stg.cdc2_oracle_d;
INSERT INTO stg.cdc2_oracle_d VALUES
( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.1'), -- record has updated attr2
( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'), -- record has no changes
( 'PT_4', 'DL_4', 'RPT_4', 'Addr4a', 'APT_4.0'); -- new record
您会看到 PT_1 有 2 条带有非重叠时间戳的记录,只有 1 条处于活动状态。