【问题标题】:Dynamic model generation based on a metadata table in dbt基于 dbt 中元数据表的动态模型生成
【发布时间】:2021-10-28 23:57:44
【问题描述】:

我是 dbt 的新手,我希望探索它以构建从 "source""staging" 架构的增量负载。 "source" 模式是来自 batch_id 的源的增量数据,"staging" 是每个 source-primary-key 一行的合并/更新数据。问题是我有超过 40 个表,我不想手动创建 40 个 .sql 文件来生成模型。谁能指出我动态模型创建的方向?我确实编写了以下代码,但我不知道它应该去哪里,这样当我发出 dbt 运行时,它会从 metadata_tbl 获取信息并动态创建模型。我的模式驻留在雪花数据库中。提前感谢所有建议。

{# OBTAIN TABLES TO BE PROCESSED FROM THE METADATA TABLE #}
{%- call statement('tgt_tbl_info', fetch_result=True) -%}
    select target_schema,
            target_table_name,
            primary_key,
            stg_schema_name
    from {{ source('admin','smetadata’_tbl) }}
    where source_system = 'TEST_INCR'
    order by target_table_name
{%- endcall -%}

{%- set tgt_tbl_name = load_result('tgt_tbl_info') -%}

{%- set tgt_tbl_name_data = tgt_tbl_name['data']-%}


{# LOOP THROUGH THE TABLES TO INCREMENTALLY LOAD THEM #}
{% for tgt_tbl_name in tgt_tbl_name_data %}
    --select {{tgt_tbl_name[0]}}, {{tgt_tbl_name[1]}}
    {{
    config(
        materialized='incremental',
        unique_key='{{tgt_tbl_name[2]}}'
    )
    }}
    
    {# OBTAIN MAX BATCH ID IN THE SOURCE SCHEMA #}
    with src_max_batch as
    (
        select {{tgt_tbl_name[2]}} as src_id, 
                max(batch_id) as src_max_batch_id
        from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
        group by {{tgt_tbl_name[2]}}
    ),

    {# OBTAIN MAX BATCH ID IN THE STAGING SCHEMA #}
    stg_max_batch as
    (
        select {{tgt_tbl_name[2]}} as stg_id, 
                max(batch_id) as stg_max_batch_id
        from {{tgt_tbl_name[3]}}.{{tgt_tbl_name[1]}}
        group by {{tgt_tbl_name[2]}}
    ),

    {# OBTAIN ROWS FROM SOURCE SCHEMA THAT NEED TO BE PROCESSED INTO STAGING SCHEMA #}
    to_process as
    (
        select src_id as process_id,
                src_max_batch_id as process_batch_id
        from src_max_batch
            left outer join stg_max_batch 
                on src_id = stg_id
        where src_max_batch_id > IFNULL(stg_max_batch_id,0)
    ),

    final as
    (
        select *
        from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
            inner join to_process
                on {{tgt_tbl_name[2]}} = process_id
                and batch_id = process_batch_id
    )

    select *
    from final

    {% if is_incremental() %}

    --where batch_id > (select max(batch_id) from {{ this }}
        where {{tgt_tbl_name[2]}} in (select id from final)

    {% endif %}

{% endfor %}

【问题讨论】:

  • 您是否考虑过创建自定义实现here
  • 感谢@Kay 为您指明方向。我会看看我怎么可能使用它。

标签: snowflake-cloud-data-platform dbt


【解决方案1】:

您应该使用 dbt 提供的 Materializations,这是一个示例和 dbt 文档的链接:https://docs.getdbt.com/docs/guides/creating-new-materializations

{%- materialization my_view, default -%}

{%- set target_relation = api.Relation.create(
    identifier=identifier, schema=schema, database=database,
    type='view') -%}

-- ... setup database ...
-- ... run pre-hooks...

-- build model
{% call statement('main') -%}
{{ create_view_as(target_relation, sql) }}
{%- endcall %}

-- ... run post-hooks ...
-- ... clean up the database...

-- Return the relations created in this materialization
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-12-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多