【发布时间】:2021-09-27 05:36:17
【问题描述】:
希望这是一个简单的问题。
我是 Airflow 的菜鸟,我正在尝试用 Airflow DAG 作为 POC 替换我的 ETL,但我正在努力做一件基本的事情。
我希望将 DAG 运行的 execution_date 或 ds 宏注入到外部函数中的 SQL 字符串中,以便我可以根据 execution_date 动态移动/聚合数据,这对于作业重新运行和回填很有用。
到目前为止,这是 DAG 的基本结构:
def create_db_engine():
[redacted]
return engine
def run_query(sql):
engine = create_db_engine()
connection = engine.connect()
data = connection.execute(sql)
return data
def wait_for_data():
sensor_query = f'''
select blah
from table
limit 1
'''
if run_query(sensor_query).rowcount >= 1:
return True
else:
return False
def run_aggregation():
agg_query = f'''
delete from table
where datefield = '{{ prev_ds }}'::DATE;
insert into table(datefield, metric)
select date_trunc('day', timefield), sum(metric)
from sourcetable
where timefield >= '{{ prev_ds }}'::DATE
and timefield < '{{ ds }}'::DATE
group by 1;
'''
run_query(agg_query)
@task
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
)
@task
def agg_operator(args):
PythonOperator(
task_id='agg_data',
python_callable=run_aggregation()
)
总结/注释:
- 基本前提是等待某事发生,然后运行能够利用执行日期的查询。
- 我正在尝试使用 {{}} 宏语法,但它似乎无法在操作员/传感器调用之外使用。
- 我正在使用 SQL alchemy,因为我使用 IAM 角色链来通过 AWS Redshift 进行身份验证,但我找不到让它与 SQLoperators/sensors 一起工作的方法。虽然如果有人对此有解决方案,那将是一个很好的奖励答案。
- Python 3.9,气流 2.1.2。数据库是 Amazon Redshift。
我尝试了几种不同的方法来让事情顺利进行:
kwargs #1 - 根据此处的答案 [https://stackoverflow.com/a/36754930/841233],添加 provide_context=True 应该使变量通过作为 **kwargs 传递给函数的 kwargs 可用。但这对我不起作用。
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True
)
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
ds = kwargs['ds']
KeyError: 'ds'
kwargs #2 - 这里的答案 [https://stackoverflow.com/a/50708735/841233] 建议通过 templates_dict 变量将您想要的字段添加到模板中。但这也不起作用
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True,
templates_dict={'start_date': '{{ prev_ds }}',
'end_date': '{{ ds }}',
'next_date': '{{ next_ds }}'},
)
...
Error
end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'
所以我的问题是:
- 这里发生了什么?这甚至可能吗?
- 这是实现我需要的正确范例吗?还是有不那么笨的方法?
【问题讨论】: