【发布时间】:2021-10-28 23:57:44
【问题描述】:
我是 dbt 的新手,我希望探索它以构建从 "source" 到 "staging" 架构的增量负载。 "source" 模式是来自 batch_id 的源的增量数据,"staging" 是每个 source-primary-key 一行的合并/更新数据。问题是我有超过 40 个表,我不想手动创建 40 个 .sql 文件来生成模型。谁能指出我动态模型创建的方向?我确实编写了以下代码,但我不知道它应该去哪里,这样当我发出 dbt 运行时,它会从 metadata_tbl 获取信息并动态创建模型。我的模式驻留在雪花数据库中。提前感谢所有建议。
{# OBTAIN TABLES TO BE PROCESSED FROM THE METADATA TABLE #}
{%- call statement('tgt_tbl_info', fetch_result=True) -%}
select target_schema,
target_table_name,
primary_key,
stg_schema_name
from {{ source('admin','smetadata’_tbl) }}
where source_system = 'TEST_INCR'
order by target_table_name
{%- endcall -%}
{%- set tgt_tbl_name = load_result('tgt_tbl_info') -%}
{%- set tgt_tbl_name_data = tgt_tbl_name['data']-%}
{# LOOP THROUGH THE TABLES TO INCREMENTALLY LOAD THEM #}
{% for tgt_tbl_name in tgt_tbl_name_data %}
--select {{tgt_tbl_name[0]}}, {{tgt_tbl_name[1]}}
{{
config(
materialized='incremental',
unique_key='{{tgt_tbl_name[2]}}'
)
}}
{# OBTAIN MAX BATCH ID IN THE SOURCE SCHEMA #}
with src_max_batch as
(
select {{tgt_tbl_name[2]}} as src_id,
max(batch_id) as src_max_batch_id
from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
group by {{tgt_tbl_name[2]}}
),
{# OBTAIN MAX BATCH ID IN THE STAGING SCHEMA #}
stg_max_batch as
(
select {{tgt_tbl_name[2]}} as stg_id,
max(batch_id) as stg_max_batch_id
from {{tgt_tbl_name[3]}}.{{tgt_tbl_name[1]}}
group by {{tgt_tbl_name[2]}}
),
{# OBTAIN ROWS FROM SOURCE SCHEMA THAT NEED TO BE PROCESSED INTO STAGING SCHEMA #}
to_process as
(
select src_id as process_id,
src_max_batch_id as process_batch_id
from src_max_batch
left outer join stg_max_batch
on src_id = stg_id
where src_max_batch_id > IFNULL(stg_max_batch_id,0)
),
final as
(
select *
from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
inner join to_process
on {{tgt_tbl_name[2]}} = process_id
and batch_id = process_batch_id
)
select *
from final
{% if is_incremental() %}
--where batch_id > (select max(batch_id) from {{ this }}
where {{tgt_tbl_name[2]}} in (select id from final)
{% endif %}
{% endfor %}
【问题讨论】:
-
您是否考虑过创建自定义实现here?
-
感谢@Kay 为您指明方向。我会看看我怎么可能使用它。
标签: snowflake-cloud-data-platform dbt