不确定我是否 100% 遵循了您的数据结构,但假设它与此类似:
{
"properties": {
"property1": {
"column1": "...",
"column2": "...",
"column3": "...",
"value": "my value 1.0"
},
"property2": {
"column1": "...",
"column2": "...",
"column3": "...",
"value": "my value 2.0"
},
"propertyX": {
"column1": "...",
"column2": "...",
"column3": "...",
"value": "my value 3.0"
}
}
}
正如您所提到的,您需要使用set 来创建变量并能够操作您的数据。就个人而言,我喜欢创建不同的变量来处理query statement、query result 和query values。所以按照这个策略,你会得到这样的结果:
{% set data_structure_query %}
select properties from src
{% endset %}
{% set results = run_query(data_structure_query) %}
{% set properties = results.columns[0].values() %}
请注意,results.columns[0].values() 将带来查询第一列的数据,在本例中为properties。
.values() 以元组的形式获取列的值,其中大部分时间项定义为string。因此,为了访问数据的属性,您必须将 json 字符串反序列化为 Python 对象,例如dict。为此,您需要使用fromjson 方法:
...
{% set properties = results.columns[0].values() %}
{% set properties_dict = fromjson(properties[0]) %}
...
假设您的查询仅返回 JSON 格式的一行,我指定 properties[0] 来访问结果查询的第一行。
在跳到下一步之前,重要的是要知道 dbt 有一个 jinja 变量,当 dbt 处于“执行模式”时会通知我们。这是我们需要担心的事情,因为它可能会引发构建模型的问题。简而言之,任何依赖于从数据库返回的结果的 jinja 都会抛出错误。
在您的情况下,results 变量取决于需要在数据库中执行的值,这意味着如果您只是尝试运行模型,您很可能会遇到Compilation Error 的问题。为避免这种情况,您需要添加 if condition 以检查 dbt 是否处于“执行模式”:
...
{% set results = run_query(data_structure_query) %}
{% if execute %}
{% set properties = results.columns[0].values() %}
{% set properties_dict = fromjson(properties[0]) %}
{% else %}
{% set properties = [] %}
{% set properties_dict = [] %}
{% endif %}
...
最后,您可以继续使用loop 来构建您的列:
select
{%- for property in properties_dict.properties %}
{{ property }}.value
{%- if not loop.last %},{% endif -%}
{%- endfor %}
from
...
这将被编译为:
select
property1.value,
property2.value,
propertyX.value
from
...
如果你想访问每一列的值,那么:
select
{%- for property in properties_dict.properties %}
'{{ properties_dict.properties[property].value }}'
{%- if not loop.last %},{% endif -%}
{%- endfor %}
from
...
将编译为:
select
'my value',
'my value 1.0',
'my value 2.0'
from
...
可能值得查看您的数据库/仓库并检查是否有任何内部函数可以处理半结构化数据。这也可以帮助您处理逻辑。例如,Snowflake 具有 lateral flatten,它执行类似的行为将属性拆分为多行。
出于调试目的,我建议 compile 您的模型并使用日志 ({{ log('my message', info=True) }}) 了解 dbt/jinja 如何处理数据。我提供的一些代码可能会根据您的查询输出而改变。
一些有用的链接:
https://docs.getdbt.com/reference/dbt-jinja-functions/run_query
https://docs.getdbt.com/reference/dbt-jinja-functions/execute
https://docs.getdbt.com/reference/dbt-jinja-functions/fromjson/
https://docs.getdbt.com/tutorial/using-jinja