【问题标题】:Perform SCD2 on snowflake table based upon oracle input data根据 oracle 输入数据对雪花表执行 SCD2
【发布时间】:2019-12-06 22:56:18
【问题描述】:

目前正在从 oracle 获取数据

作为初始负载的一部分,使用命名阶段将所有历史数据从 oracle 表 oracle_a 摄取到雪花表“snow_a”并复制到命令中。

我想基于 oracle_a 表对 snow_a 表执行 SCD2。

我的意思是,如果有任何新记录添加到 Oracle_a 表中,那么该记录将被插入以及对 oracle_a 表的现有记录的任何更改,

snow_a 表的现有记录要过期并插入记录。更多详情请参考下图。

oracle_a 表具有键列 key_col1、key_col2、key_col3,如下图所示。 attr1 和 attr2 是表的其他属性enter image description here

【问题讨论】:

  • @RaoSK 很抱歉社区已经关闭,否则我不会问,我昨天添加的答案,您是否能够共享 oracle_table 的架构,或者雪花_a 和 oracle_a_history 之间的合并可能是合并了吗?
  • 这是原始链接,网站已备份:community.snowflake.com/s/question/0D50Z00009ffO0QSAU/…

标签: snowflake-cloud-data-platform


【解决方案1】:

在 Snowflake 中的表上实现 SCD 类型 2 功能与在任何其他关系数据库中没有什么不同。但是,还有其他功能可以帮助完成此过程。请查看有关使用雪花流和任务执行 SCD 逻辑的博客文章系列。 https://www.snowflake.com/blog/building-a-type-2-slowly-changing-dimension-in-snowflake-using-streams-and-tasks-part-1/

干杯, 迈克尔·雷尼

【讨论】:

    【解决方案2】:

    好的,这就是我发现的——尽管您可能需要调整更新和插入的来源——因为 oracle_a 不在 Snowflake 中。

    CREATE TABLE snowflake_a(key_col1 varchar(10), key_col2 varchar(10), key_col3 varchar(10), attr1 varchar(8), attr2 varchar(10), eff_ts TIMESTAMP, exp_ts TIMESTAMP, valid varchar(10)); 
    
    DROP table oracle_a;
    INSERT INTO snowflake_a VALUES('PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', current_date, current_date, 'Active');
    
    
    CREATE TABLE oracle_a(key_col1 varchar(10), key_col2 varchar(10), key_col3 varchar(10), attr1 varchar(8), attr2 varchar(8), eff_ts TIMESTAMP, exp_ts TIMESTAMP); 
    
    INSERT INTO oracle_a
    VALUES( 'PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', '10/24/2019', '12/31/1999');
    
    
    UPDATE snowflake_a
       SET valid = 'Expired'
    WHERE valid LIKE '%Active%';
    
    SELECT * FROM snowflake_a;
    
    INSERT INTO snowflake_a VALUES( 'PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', '10/24/2019', '12/31/1999', 'Active');
    
    SELECT * FROM snowflake_a;
    

    或者更好的是,我们使用什么来从您的 Oracle 生态系统连接到 Snowflake 生态系统?

    【讨论】:

    • 其实这行不通,因为它没有使用scd2的概念。
    • 谢谢西蒙!
    【解决方案3】:

    从问题来看,传入的 Oracle 行似乎不包含任何 SCD2 类型的列,并且当插入雪花的每一行都将使用 SCD2 类型功能插入时。

    SCD2 列可以对业务具有特定含义,例如“exp_ts”可以是实际日期或业务日期。 Snowflake 'Stage' 不包括 SCD2 功能。这通常是 ETL 框架的角色,而不是“快速/批量”加载实用程序的角色。

    大多数 ETL 供应商都将 SCD2 功能作为其产品的一部分。

    【讨论】:

      【解决方案4】:

      我按照以下步骤执行 SCD2。

      1. 已将 Oracle_a 表数据加载到 TEMPORARY scd2_temp 表中
      2. 对 snow_a 执行更新以通过加入使“更改的记录”到期 键列并检查其余属性
      3. 从 TEMPORARY scd2_temp 插入到 snow_a 表到 snow_a 表

      【讨论】:

        【解决方案5】:

        这是基于以下假设的解决方案:

        1. 源 oracle 表本身不负责 SCD2 处理(因此 Eff/Exp TS 列不会出现在那个 表)。
        2. 有一个外部进程只提取/加载增量 (新的、更新的)记录到雪花中。
        3. 源 oracle 没有删除记录

        首先创建表并添加第一组增量数据:

        CREATE OR REPLACE TABLE stg.cdc2_oracle_d (
          key1 varchar(10),
          key2 varchar(10),
          key3 varchar(10),
          attr1 varchar(8),
          attr2 varchar(8));
        
        CREATE OR REPLACE TABLE edw.cdc2_snowflake_d (
          key1 varchar(10),
          key2 varchar(10),
          key3 varchar(10),
          attr1 varchar(8),
          attr2 varchar(8),
          eff_ts TIMESTAMP_LTZ(0),
          exp_ts TIMESTAMP_LTZ(0),
          active_fl char(1));
        
        INSERT INTO stg.cdc2_oracle_d VALUES
         ( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.0'),
         ( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'),
         ( 'PT_3', 'DL_3', 'RPT_3', 'Addr3a', 'APT_3.0');
        

        然后运行以下转换脚本:

        BEGIN;
        -- 1: insert new-new records from stg table that don't current exist in the edw table
        INSERT INTO edw.cdc2_snowflake_d
        SELECT
          key1,
          key2,
          key3,
          attr1,
          attr2,
          CURRENT_TIMESTAMP(0) AS eff_ts,
          CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
          'Y' AS active_fl
        FROM stg.cdc2_oracle_d stg
        WHERE NOT EXISTS (
          SELECT 1
            FROM edw.cdc2_snowflake_d edw
           WHERE edw.key1 = stg.key1
             AND edw.key2 = stg.key2
             AND edw.key3 = stg.key3
             AND edw.active_fl = 'Y');
        
        -- 2: insert new version of record from stg table where key current does exist in edw table 
              -- but only add if the attr columns are different, otherwise it's the same record
        INSERT INTO edw.cdc2_snowflake_d
        SELECT
          stg.key1,
          stg.key2,
          stg.key3,
          stg.attr1,
          stg.attr2,
          CURRENT_TIMESTAMP(0) AS eff_ts,
          CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
          'T' AS active_fl  -- set flat to Temporary setting
        FROM stg.cdc2_oracle_d stg
        JOIN edw.cdc2_snowflake_d edw ON edw.key1 = stg.key1 AND edw.key2 = stg.key2 
                                     AND edw.key3 = stg.key3 AND edw.active_fl = 'Y'
        WHERE (stg.attr1 <> edw.attr1
           OR  stg.attr2 <> edw.attr2);
        
        -- 3: deactive the current record where there is a new record from above step
              -- and set the end_ts to 1 second prior to new record so there is no overlap in data
        UPDATE edw.cdc2_snowflake_d old
           SET old.active_fl = 'N',
               old.exp_ts = DATEADD(SECOND, -1, new.eff_ts)
          FROM edw.cdc2_snowflake_d new
         WHERE old.key1 = new.key1
           AND old.key2 = new.key2
           AND old.key3 = new.key3
           AND new.active_fl = 'T'
           AND old.active_fl = 'Y';
        
        -- 4: finally set all the temporary records to active
        UPDATE cdc2_snowflake_d tmp
           SET tmp.active_fl = 'Y'
         WHERE tmp.active_fl = 'T';
        
        COMMIT;
        

        查看结果,然后截断并添加新数据并再次运行脚本:

        SELECT * FROM stg.cdc2_oracle_d;
        SELECT * FROM edw.cdc2_snowflake_d ORDER BY 1,2,3,5;
        
        TRUNCATE TABLE stg.cdc2_oracle_d; 
        
        INSERT INTO stg.cdc2_oracle_d VALUES
         ( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.1'),  -- record has updated attr2
         ( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'),  -- record has no changes
         ( 'PT_4', 'DL_4', 'RPT_4', 'Addr4a', 'APT_4.0');  -- new record
        

        您会看到 PT_1 有 2 条带有非重叠时间戳的记录,只有 1 条处于活动状态。

        【讨论】:

          猜你喜欢
          • 2020-09-29
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-03-27
          • 1970-01-01
          • 1970-01-01
          • 2023-02-11
          • 1970-01-01
          相关资源
          最近更新 更多