【问题标题】:Is it possible in Snowflake to automate a merge?在雪花中是否可以自动合并?
【发布时间】:2020-10-08 15:38:19
【问题描述】:

目前我有一个脚本可以在我的源表和目标表之间合并,但要更新和插入。这两个表都通过在雪花上创建的任务每天更新。我也想每天进行这种合并。是否可以通过任务或雪​​花上的其他东西来自动化这种合并?

谢谢

【问题讨论】:

  • 我不确定您的确切用例,但您可能想看看结合流和任务。流有效地充当源表中的时间书签,以便您的任务可以使用合并(或包含合并的过程)增量处理新记录

标签: merge snowflake-cloud-data-platform snowflake-task


【解决方案1】:

如果你的脚本只包含 SQL 命令(或者可以用 JS 编写的命令),你可以创建一个存储过程来调用它们,然后创建一个任务来每天运行这个过程。

https://docs.snowflake.com/en/sql-reference/stored-procedures-usage.html

https://docs.snowflake.com/en/user-guide/tasks-intro.html

【讨论】:

    【解决方案2】:
    -- Here is prerequisite for running automerge procedure that is pasted at the back ---
      1  --Create Log Table:
        --EDWH_DEV.WS_EA_DNATA_DEV.GEN_LOG definition
         
        create or replace TABLE GEN_LOG (
         
        LOG_ID NUMBER(38,0) autoincrement,
         
        "number of rows inserted" NUMBER(38,0),
         
        "number of rows updated" NUMBER(38,0),
         
        PROC_NAME VARCHAR(100),
         
        FINISHED TIMESTAMP_NTZ(9),
         
        USER_NAME VARCHAR(100),
         
        USER_ROLE VARCHAR(100),
         
        STATUS VARCHAR(50),
         
        MESSAGE VARCHAR(2000)
         
        );
         
     2   --Data is loaded based on an existing table structure which must match source file columns count.
        --Example:
         
        --EDWH_DEV.WS_EA_DNATA_DEV.AIRLINES definition
         
        create or replace TABLE AIRLINES (
         
        CONSOLIDATED_AIRLINE_CODE VARCHAR(80),
         
        POSSIBLE_CUSTOMER_NAME VARCHAR(100),
         
        CUSTOMER_TYPE VARCHAR(70),
         
        CONSOLIDATED_AIRLINE_NAME VARCHAR(90),
         
        constraint CONSOLIDATED_AIRLINE_CODE unique (CONSOLIDATED_AIRLINE_CODE),
         
        constraint CUSTOMER_TYPE unique (CUSTOMER_TYPE)
         
        );
         
         
         
      3  --File in stage is AIRLINES.CSV has same column number in same order, not necessary has to have same headers as they will be aliased automatically to created table column names as above.
         
         
         
     4   --Make sure you have required file format set or use default ones(refer to SF documentation)
         
        --ALTER FILE FORMAT "EDWH_DEV"."WS_EA_DNATA_DEV".CSV SET COMPRESSION = 'AUTO' FIELD_DELIMITER = ',' RECORD_DELIMITER = '\n' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '\042' TRIM_SPACE = FALSE ERROR_ON_COLUMN_COUNT_MISMATCH = ----TRUE ESCAPE = 'NONE' ESCAPE_UNENCLOSED_FIELD = '\134' DATE_FORMAT = 'AUTO' TIMESTAMP_FORMAT = 'AUTO' NULL_IF = ('\\N');
         
      5  --Tables must be appended to have constraints which then will be used for MERGE ON clause in merge statement. Constraint name must match Column name.
         
         
        ALTER TABLE AIRLINES ADD CONSTRAINT CONSOLIDATED_AIRLINE_CODE UNIQUE (CONSOLIDATED_AIRLINE_CODE);
         
        ALTER TABLE AIRLINES ADD CONSTRAINT CUSTOMER_TYPE UNIQUE (CUSTOMER_TYPE);
         
     6   --You have stage set up and you can view files in it.
        list @my_stage;
         
        
      7 -- this view is used to pull unique fields for on clause in merge 
          CREATE OR REPLACE VIEW CONSTRAINS_VW AS
            SELECT
                    tbl.table_schema,
                    tbl.table_name,
                    con.constraint_name,
                    col.data_type
             FROM   EDWH_DEV.information_schema.table_constraints con
                    INNER JOIN EDWH_DEV.information_schema.tables tbl
                            ON con.table_name = tbl.table_name
                              AND con.constraint_schema = tbl.table_schema
                    INNER JOIN EDWH_DEV.information_schema.columns col
                            ON tbl.table_name = col.table_name
                               AND con.constraint_name = col.column_name
                               AND con.constraint_schema = col.table_schema
           ;
                                
             WHERE  con.constraint_type  in ('PRIMARY KEY', 'UNIQUE');
    

    ------ 通用程序代码编译一次多次使用:) ---

    CREATE OR REPLACE PROCEDURE "MERGER_BUILDER_GEN"("TABLE_NAME" VARCHAR(200), "SCHEMA_NAME" VARCHAR(200), "STAGE_NAME" VARCHAR(200))
        RETURNS VARCHAR(32000)
        LANGUAGE JAVASCRIPT
        EXECUTE AS CALLER
        AS $$
        var result;
        snowflake.execute( {sqlText: "begin transaction;"});
        var my_sql_command = `SELECT 
            0 AS "number of rows inserted"
            , 0 as "number of rows updated"
            ,'` + TABLE_NAME + `' AS proc_name
            ,CURRENT_TIMESTAMP() AS FINISHED
            ,CURRENT_USER() AS USER_NAME 
            ,CURRENT_ROLE() USER_ROLE
            ,'Failed' as status`;
            var statement1 = snowflake.createStatement( {sqlText: my_sql_command} );
            var result_set1 = statement1.execute();
          result_set1.next();
            var column1 = result_set1.getColumnValue(1);
            var column2 = result_set1.getColumnValue(2);
            var column3 = result_set1.getColumnValue(3);
            var column4 = result_set1.getColumnValue(4);
            var column5 = result_set1.getColumnValue(5);
            var column6 = result_set1.getColumnValue(6);
            var column7 = result_set1.getColumnValue(7);
    
    try {
        var v_sql_stmt = `CREATE OR REPLACE temporary TABLE vars_of_merger_dyn00 AS 
                        SELECT  
                        COL_NAMES_SELECT    
                        ,REPLACE(listagg (distinct' nvl(tgt."'||cons.constraint_name||'",'
                        ||CASE  WHEN cons.data_type ='FLOAT' THEN '0' 
                                WHEN cons.data_type ='NUMBER' THEN '0'
                                WHEN cons.data_type ='DATE' THEN '''1900-12-01'''
                                WHEN cons.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00'''
                                ELSE '-999999' END||') = nvl(src."' 
                                ||cons.constraint_name ||'",'
                        ||CASE  WHEN cons.data_type ='FLOAT' THEN '0' 
                                WHEN cons.data_type ='NUMBER' THEN '0'
                                WHEN cons.data_type ='DATE' THEN '''1900-12-01'''
                                WHEN cons.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00'''
                                ELSE '-999999' END  ,') and \n') ||')','-999999','''''') AS dd
                        ,REPLACE(COL_NAMES_WHEN,'-999999','''''') AS COL_NAMES_WHEN
                        ,COL_NAMES_SET
                        ,COL_NAMES_INS
                        ,COL_NAMES_INS1
                        FROM (
                        SELECT 
                         InTab.TABLE_NAME              
                        ,listagg (' cast($'   ||InTab.ORDINAL_POSITION || ' as ' || intab.DATA_TYPE || ') as "' ||InTab.COLUMN_NAME,'", \n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||'"'  AS Col_Names_select
                        ,listagg (' nvl(tgt."'  || CASE WHEN intab.CM IS NULL THEN InTab.COLUMN_NAME ELSE NULL end  || '", '
                        ||CASE  WHEN intab.data_type ='FLOAT' THEN '0' 
                                WHEN intab.data_type ='NUMBER' THEN '0'
                                WHEN intab.data_type ='DATE' THEN '''1900-12-01'''
                                WHEN intab.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00''' ELSE '-999999' END
                        ||') != nvl(src."' ||InTab.COLUMN_NAME||'",'||
                          CASE  WHEN intab.data_type ='FLOAT' THEN '0' 
                                WHEN intab.data_type ='NUMBER' THEN '0'
                                WHEN intab.data_type ='DATE' THEN '''1900-12-01'''
                                WHEN intab.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00''' ELSE '-999999' END 
                        ,') OR\n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||')' AS Col_Names_when
                        ,listagg (' tgt."'  ||CASE WHEN intab.CM IS NULL THEN InTab.COLUMN_NAME ELSE NULL end || '"= src."' ||InTab.COLUMN_NAME , '",\n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||'"' AS Col_Names_set
                        ,listagg ( '"'||InTab.COLUMN_NAME,'",\n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||'"' AS Col_Names_ins
                        ,listagg ( ' src."'  ||InTab.COLUMN_NAME,'",\n') WITHIN GROUP ( ORDER BY InTab.ORDINAL_POSITION asc ) ||'"' AS Col_Names_ins1 
                        ,listagg (ORDINAL_POSITION,',') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ORDINAL_POSITION
                        FROM (
                        SELECT 
                         InTab.TABLE_NAME              
                        ,InTab.COLUMN_NAME
                        ,InTab.ORDINAL_POSITION
                        ,intab.DATA_TYPE
                        ,cons.CONSTRAINT_NAME AS CM
                        FROM INFORMATION_SCHEMA.COLUMNS InTab 
                        LEFT JOIN constrains_vw cons ON cons.table_name = intab.table_name AND InTab.COLUMN_NAME = cons.CONSTRAINT_NAME
                        where intab.TABLE_SCHEMA = '`+ SCHEMA_NAME +`'
                        AND intab.TABLE_NAME = '`+ TABLE_NAME +`'
                        GROUP BY 
                        InTab.TABLE_NAME
                        ,InTab.COLUMN_NAME 
                        ,InTab.COLUMN_NAME
                        ,InTab.ORDINAL_POSITION
                        ,intab.DATA_TYPE
                        ,CONSTRAINT_NAME
                        ORDER BY InTab.TABLE_NAME,InTab.ORDINAL_POSITION ) InTab
                        GROUP BY TABLE_NAME
                        ORDER BY TABLE_NAME,ORDINAL_POSITION
                        ) tt
                        LEFT JOIN constrains_vw cons ON cons.table_name = tt.table_name
                        GROUP BY
                        COL_NAMES_SELECT    
                        ,COL_NAMES_WHEN
                        ,COL_NAMES_SET
                        ,COL_NAMES_INS
                        ,COL_NAMES_INS1;` ; 
        
        var rs_clip_name = snowflake.execute ({sqlText: v_sql_stmt});
       
        var my_sql_command1 = `SELECT Col_Names_select,dd,Col_Names_when,Col_Names_set,Col_Names_ins,Col_Names_ins1 FROM vars_of_merger_dyn00;`; 
        
        var statement2 = snowflake.createStatement( {sqlText: my_sql_command1} );
        var result_set = statement2.execute();
        result_set.next();
        var Col_Names_select = result_set.getColumnValue(1);
        var dd = result_set.getColumnValue(2);
        var Col_Names_when = result_set.getColumnValue(3);
        var Col_Names_set = result_set.getColumnValue(4);
        var Col_Names_ins = result_set.getColumnValue(5);
        var Col_Names_ins1 = result_set.getColumnValue(6);
    
    if (Col_Names_set == '"') 
    { 
    var my_sql_command2 = `MERGE INTO EDWH_DEV.`+ SCHEMA_NAME +`.`+ TABLE_NAME +` AS tgt
    USING 
    ( select
    `+ Col_Names_select +`
    from 
    @` + STAGE_NAME + `/` + TABLE_NAME + `.csv  (file_format => 'CSV') )
    AS src
    
    ON ( `+ dd +`
         )
    
    WHEN NOT MATCHED
    THEN INSERT ( `+ Col_Names_ins +`)
    VALUES 
    (`+ Col_Names_ins1 +`); `; 
        var rs_clip_name2 = snowflake.execute ({sqlText: my_sql_command2});
    
    snowflake.createStatement( { sqlText: `INSERT INTO GEN_LOG
    ("number of rows inserted", "number of rows updated", proc_name , FINISHED, USER_NAME, USER_ROLE, STATUS, MESSAGE)
     SELECT "number of rows inserted", 0 as "number of rows updated", '` + TABLE_NAME + `' AS proc_name  , sysdate(), CURRENT_USER() ,CURRENT_ROLE(),'done' as status ,'' AS message
            FROM TABLE (RESULT_SCAN(LAST_QUERY_ID()));`} ).execute();
    
    } 
    else 
    {
    var my_sql_command2 = `MERGE INTO EDWH_DEV.`+ SCHEMA_NAME +`.`+ TABLE_NAME +` AS tgt
    USING 
    ( select
    `+ Col_Names_select +`
    from 
    @` + STAGE_NAME + `/` + TABLE_NAME + `.csv  (file_format => 'CSV') )
    AS src
    ON ( `+ dd +`
         )
    WHEN MATCHED
    AND `+ Col_Names_when +`
    THEN UPDATE SET
    `+ Col_Names_set +`
    WHEN NOT MATCHED
    THEN INSERT ( `+ Col_Names_ins +`)
    VALUES 
    (`+ Col_Names_ins1 +`); `; 
        var rs_clip_name2 = snowflake.execute ({sqlText: my_sql_command2});
    
    snowflake.createStatement( { sqlText: `INSERT INTO GEN_LOG
    ("number of rows inserted", "number of rows updated", proc_name , FINISHED, USER_NAME, USER_ROLE, STATUS, MESSAGE)
     SELECT "number of rows inserted","number of rows updated", '` + TABLE_NAME + `' AS proc_name  , sysdate(), CURRENT_USER() ,CURRENT_ROLE(),'done' as status ,'' AS message
            FROM TABLE (RESULT_SCAN(LAST_QUERY_ID()));`} ).execute();   
    
    }
         snowflake.execute( {sqlText: "commit;"} );
        result = "Succeeded" + my_sql_command2 ;
    } catch (err) {
      snowflake.execute({
          sqlText: `insert into GEN_LOG VALUES (DEFAULT,?,?,?,?,?,?,?,?)`
          ,binds: [column1, column2, column3 ,column4 , column5 , column6 ,column7 , err.code + " | State: " + err.state + "\n  Message: " + err.message + "\nStack Trace:\n" + err.stackTraceTxt ]
          });
         snowflake.execute( {sqlText: "commit;"} );
         return 'Failed.' + my_sql_command2 ;
    }
    return result;
    
    $$;
    

    现在您可以在这里停下来,将 proc 用作:CALL MERGER_BUILDER_GEN('MY_TABLE','MY_SCHEMA','MY_STAGE');示例 --- 所有大小写敏感

    所以它在一个坚果壳中的作用是为您在模式中创建并提供给 proc 的任何表 ddl 编写一个适当的合并语句,它查找文件并从中动态创建选择用于合并选择,然后是其他小位像“on 子句”、“当匹配和 nvl(everything) 和当不匹配时然后插入”它也会动态转换为不同的数据类型,有点像“复制到”所做的,但在我看来,合并更好对于不完美的增量,因此如果您不想让数据湖在日期上具有分区文件,然后通过外部表拼接在一起,或者在联合视图中上帝禁止,那么请试一试。

    您还可以使用很少的设置来运行任意数量的表,使用自动合并 1 x 1

    create or replace TABLE PROC_LIST (
        PROC_PRIORIT_ID NUMBER(38,0) autoincrement,
        PROC_NAME VARCHAR(150)
    );
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE1'); with 50 columns 
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE2');
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE3');
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE4');
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE5'); with 500 columns 
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE6');
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE7');
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE8'); limit dyn sql is 32000 chars go crazy 
    INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE9');
    
    --CREATEed SOME nice LIST OF TABLES TO be loaded 1 BY 1 USING AUTO merge !
    
    
    CREATE OR REPLACE VIEW PROC_LOAD_CONTROL AS
    select 
    metadata$filename
    ,REPLACE(REPLACE(metadata$filename,'.csv',''),'path/to/your_table_ifnot_inmain_stage_location/','') AS file_name
    ,pl.PROC_NAME AS table_name
    ,'MY_SCHEMA' as schema_name
    ,'MY_STAGE' AS stage_name
    from @MY_STAGE
    inner JOIN PROC_LIST pl ON pl.PROC_NAME = REPLACE(REPLACE(metadata$filename,'.csv',''),'path/to/your_table_ifnot_inmain_stage_location/','')
    GROUP BY metadata$filename,pl.proc_name
    ORDER BY REPLACE(REPLACE(metadata$filename,'.csv',''),'path/to/your_table_ifnot_inmain_stage_location/','') asc;
    
    --this will make sure that your TABLES MATCH names WITH actual FILES IN your STAGE, please look FOR requisite TO make this thing WORK smoothly
    
    CREATE OR REPLACE PROCEDURE "PROJECT_REFRESH_MRG"()
    RETURNS VARCHAR(1000)
    LANGUAGE JAVASCRIPT
    EXECUTE AS OWNER
    AS $$
    
    try {
            
        var v_sql_stmt = `SELECT 
        table_name
        ,schema_name
        ,stage_name
        
        FROM PROC_LOAD_CONTROL;`;
        var rs_proc_name = snowflake.execute ({sqlText: v_sql_stmt});
        var v_table_name = '';
        var v_schema_name = '';
        var v_stage_name = '';
        
        //loop throgh all the external table and refresh
        while (rs_proc_name.next())  {
            v_table_name = rs_proc_name.getColumnValue(1);
            v_schema_name = rs_proc_name.getColumnValue(2);
            v_stage_name = rs_proc_name.getColumnValue(3);
           
            //refresh the external table
            v_sql_stmt = `call MERGER_BUILDER_GEN('`+v_table_name+`','`+v_schema_name+`','`+v_stage_name+`')`;
            snowflake.execute ({sqlText: v_sql_stmt});
    
        }
        return "Success: " + v_sql_stmt;
    }
    catch (err)  
        {
            //error log here                  
            return "Failed" + err;   // Return a success/error indicator
        }
    $$;
    

    --- 所以这将创建一个包含阶段和模式变量的表列表,并将 while 循环传递给通用合并构建器。

    【讨论】:

    • 嗨 Nicolas,我对作为增量数据的 avro 文件有类似的要求,我目前无法使用流,请问您有类似的示例
    猜你喜欢
    • 2020-04-06
    • 2019-04-05
    • 1970-01-01
    • 2022-06-19
    • 1970-01-01
    • 2022-08-19
    • 2022-08-13
    • 1970-01-01
    • 2018-11-21
    相关资源
    最近更新 更多