【问题标题】:Accessing macros outside of an operator/sensor in Airflow 2.0在 Airflow 2.0 中访问操作员/传感器之外的宏
【发布时间】:2021-09-27 05:36:17
【问题描述】:

希望这是一个简单的问题。

我是 Airflow 的菜鸟,我正在尝试用 Airflow DAG 作为 POC 替换我的 ETL,但我正在努力做一件基本的事情。

我希望将 DAG 运行的 execution_date 或 ds 宏注入到外部函数中的 SQL 字符串中,以便我可以根据 execution_date 动态移动/聚合数据,这对于作业重新运行和回填很有用。

到目前为止,这是 DAG 的基本结构:

def create_db_engine():
    [redacted]
    return engine

def run_query(sql):
    engine = create_db_engine()
    connection = engine.connect()
    data = connection.execute(sql)
    return data

def wait_for_data():
    sensor_query = f'''
    select blah
    from table
    limit 1
    '''
    if run_query(sensor_query).rowcount >= 1:
        return True
    else:
        return False

def run_aggregation():
    agg_query = f'''
    delete from table
    where datefield = '{{ prev_ds }}'::DATE;
    
    insert into table(datefield, metric)
    select date_trunc('day', timefield), sum(metric)
    from sourcetable
    where timefield >= '{{ prev_ds }}'::DATE
    and timefield < '{{ ds }}'::DATE
    group by 1;
    '''
    run_query(agg_query)

@task
def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
    )

@task
def agg_operator(args):
    PythonOperator(
        task_id='agg_data',
        python_callable=run_aggregation()
    )

总结/注释:

  1. 基本前提是等待某事发生,然后运行能够利用执行日期的查询。
  2. 我正在尝试使用 {{}} 宏语法,但它似乎无法在操作员/传感器调用之外使用。
  3. 我正在使用 SQL alchemy,因为我使用 IAM 角色链来通过 AWS Redshift 进行身份验证,但我找不到让它与 SQLoperators/sensors 一起工作的方法。虽然如果有人对此有解决方案,那将是一个很好的奖励答案。
  4. Python 3.9,气流 2.1.2。数据库是 Amazon Redshift。

我尝试了几种不同的方法来让事情顺利进行:

kwargs #1 - 根据此处的答案 [https://stackoverflow.com/a/36754930/841233],添加 provide_context=True 应该使变量通过作为 **kwargs 传递给函数的 kwargs 可用。但这对我不起作用。

def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
        poke_interval=30,
        timeout=3600,
        provide_context=True
    )
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
    ds = kwargs['ds']
KeyError: 'ds'

kwargs #2 - 这里的答案 [https://stackoverflow.com/a/50708735/841233] 建议通过 templates_dict 变量将您想要的字段添加到模板中。但这也不起作用

def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
        poke_interval=30,
        timeout=3600,
        provide_context=True,
        templates_dict={'start_date': '{{ prev_ds }}',
                        'end_date': '{{ ds }}',
                        'next_date': '{{ next_ds }}'},
    )
...
Error
    end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'

所以我的问题是:

  1. 这里发生了什么?这甚至可能吗?
  2. 这是实现我需要的正确范例吗?还是有不那么笨的方法?

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    这实际上很有可能而且不是一个坏主意,但是您必须通过 JINJA 模板引擎手动运行您的字符串(这是 Airflow 在处理传递给操作员的模板参数时所做的)。

    Airflow 会自动为添加到 templated_fields 列表中的所有字段执行此操作 - 但由于您的运算符是 Python 代码,因此没有什么可以阻止您手动执行类似的处理。

    您不应该像尝试那样使用 PythonOperator。使用 TaskFlow API,@task 会自动使用 PythonOperator 包装可调用的 Python 方法,因此您的任务只是编写正确的 Python 代码 - 甚至无需考虑 PythonOperator。

    唯一的困难是您需要获取上下文,但这可以通过 get_current_context() 方法轻松实现:Passing arguements using Taskflow API in Airflow 2.0

    一旦你有了上下文(这正是包含所有 {{ next_ds }} 和其他上下文变量的内容),你可以简单地获取你的字符串并使用 Jinja 模板处理它,将上下文传递给 JINJA。你可以看到 Airflow 在内部是如何做的:https://github.com/apache/airflow/blob/932c3b5df5444970484f0ec23589be1820a3270d/airflow/models/baseoperator.py#L1070 - 它有点复杂,因为它处理了 xcom 等几种不同的情况,但你可以把它当作灵感。

    【讨论】:

    • 我明白了。对于这样一个常见的操作,我认为有一种更简单的方法。我将研究模板引擎。感谢您提供有关 PythonOperator 的提示。这超级有用。我想知道这是否意味着我可以在任务 def 块中生成 SQL?虽然我猜对传感器没有帮助。
    • 这根本不常见。我会说这很简单。您需要添加大约三行代码:)。
    【解决方案2】:

    除了 Jarek 的回答(确实有效)之外,我发现了一种更直接的方法来做我需要的事情,不需要 Jinja 模板。事实证明,您可以导入 get_current_context 函数并使用它在函数之间传递作业上下文。

    from airflow.operators.python import PythonOperator, get_current_context
    ...
    @task
    def data_sensor():
        context = get_current_context()
        PythonSensor(
            task_id='check_data',
            python_callable=wait_for_data(context),
            poke_interval=30,
            timeout=3600
        )
    ....
    def wait_for_data(context):
        end_date = context['ds']
        next_date = context['next_ds']
    

    【讨论】:

    • 好吧。您这样做的方式有点奇怪(我从未见过有人将运算符和装饰器混合在一个函数中)-这有点奇怪且不必要。如果您真的想使用 Python 传感器,则不应以这种方式使用它,而应仅将传感器用作“常规/经典”任务而无需任务流 API。您可以轻松混合搭配“经典”任务和“常规”任务:airflow.apache.org/docs/apache-airflow/stable/…
    • 这实际上很高兴知道。我是一个完整的菜鸟,我只知道任务装饰器,所以“常规/经典”对我来说毫无意义。我很感激您的建议,并将利用您的建议来改进我的 DAG。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-10-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-22
    • 1970-01-01
    • 2019-12-08
    相关资源
    最近更新 更多