【问题标题】:Dynamic Merge statement (upsert) in Amazon RedshiftAmazon Redshift 中的动态合并语句 (upsert)
【发布时间】:2019-12-11 08:03:48
【问题描述】:

我想与您分享我为 Amazon Redshift 制作的东西,它是 SCD(缓慢变化的维度)Type 2.987654321@的动态合并语句

此语句假设预先存在:

  • 数据库中有两个架构 - dbimportsrepo。 dbimports 模式用作暂存区,repo 将成为 SCD 类型 2 维度和事实的目标。
  • repo 和 dbimports 架构中的表具有相同的名称以及来自源数据的相同列名称。
  • repo 架构中的表将具有预设的主键。
  • repo 架构中的表将具有三个附加列 - scd_keyis_activeinserted_date。 scd_key 是标识列,如果未强制执行主键约束,则可以省略。

请注意,我故意没有添加 end_date 列,因为我发现它在我的特定项目的情况下不可用。

这个语句的作用:

  • repo 架构中使用一个参数 (table_name varchar(256)) 创建一个存储过程。
  • 从 table_name 参数中动态获取要合并的表名,并对 repo 模式中 dbimports 表与其镜像表之间的匹配行的 is_active 标志列(设置为 0)执行 UPDATE 语句。
  • 动态定义 UPDATE 语句的连接条件。
  • 在 is_active 标志设置为 1 和当前日期 (GETDATE()) 的情况下,从 dbimports 到 repo 架构进行 INSERT * INTO。

我知道如果您在普通 SQL 环境中构建它,这不是一项艰巨的任务,但您可能知道,Redshift 的 SQL 是“高度修改的”。这意味着大多数普通 SQL 的功能都像变量和触发器一样被删除,这使得这项任务很难弄清楚(至少对我来说是这样)。

希望这对任何人都有帮助。祝你好运。

【问题讨论】:

    标签: sql amazon-redshift


    【解决方案1】:
        CREATE OR REPLACE PROCEDURE repo.inserter(table_name IN varchar(256))
    AS $$
    DECLARE 
        temp_table_name varchar := table_name || '_temp';
        one_liner varchar := temp_table_name || '_one_liner';
        query varchar;
        query_builder varchar;
        rec record;
    BEGIN
    EXECUTE 'DROP TABLE IF EXISTS ' || temp_table_name;
    EXECUTE 'DROP TABLE IF EXISTS ' || one_liner;
    EXECUTE 'CREATE TEMP TABLE ' || temp_table_name || ' (column_name varchar(256),ordinal_position int,is_nullable varchar(3));';
    EXECUTE 'INSERT INTO ' || temp_table_name || '(column_name,ordinal_position,is_nullable) SELECT column_name,ordinal_position,is_nullable FROM svv_columns WHERE table_schema = ''repo'' AND table_name = ''' || table_name || ''';';
    EXECUTE 'CREATE TEMP TABLE ' || one_liner || ' (columns_string varchar(1024));';
    EXECUTE 'INSERT INTO ' || one_liner || '  SELECT LISTAGG(column_name,'','') WITHIN GROUP (order by ordinal_position) FROM ' || temp_table_name || ' where column_name not in ( ''scd_key'',''inserted_date'',''is_active'');';
    query := 'SELECT column_name FROM ' || temp_table_name || ' WHERE is_nullable = ''NO'' and column_name not in(''scd_key'',''inserted_date'') order by ordinal_position;';
    query_builder := 'UPDATE repo.' || table_name || ' SET is_active = 0 FROM dbimports.' || table_name || ' b WHERE ';
    FOR rec IN EXECUTE query LOOP
    query_builder := query_builder || 'repo.' || table_name || '.' || rec.column_name || ' = b.' || rec.column_name || ' AND ';
    END LOOP;
    query_builder := RTRIM(query_builder,'AND ') || ';';
    EXECUTE query_builder;
    query := 'SELECT columns_string FROM ' || one_liner || ';';
    FOR rec IN EXECUTE query LOOP
    query_builder := 'INSERT INTO repo.' || table_name || '(' ||  rec.columns_string || ',is_active,inserted_date) (SELECT ' || rec.columns_string || ',1 as is_active,GETDATE() as inserted_date FROM dbimports.' || table_name || ');';
    END LOOP;
    EXECUTE query_builder;
    
    END;
    $$  LANGUAGE plpgsql; 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-12-19
      • 1970-01-01
      • 2010-10-16
      • 1970-01-01
      • 1970-01-01
      • 2021-03-13
      • 2018-09-08
      • 1970-01-01
      相关资源
      最近更新 更多