【问题标题】:airflow dynamic task returns list instead of dictionary气流动态任务返回列表而不是字典
【发布时间】:2022-11-09 22:30:17
【问题描述】:

请参考下面我的代码。

recon_rule_setup 任务正在运行时,每次它都会从先前的read_recon_config 任务中获取字典(recon_conf)作为输入。

但是,当recon_rule_exec 正在运行时,它会从上一个任务中获取 List 作为输入 (recon_rule)。

我的期望是,recon_rule_setup 应该运行 2 次,recon_rule_exec 应该运行 4 次,具体取决于返回值。

为什么 expand 每次都表现不同。

from datetime import datetime
from airflow.models import DAG, XCom
from airflow.utils.dates import days_ago
from airflow.decorators import task
from airflow.utils.db import provide_session
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup


@provide_session
def clear_xcom_data(session=None, **kwargs):
    dag_instance = kwargs["dag"]
    dag_id = dag_instance._dag_id
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()


@task(task_id="read_recon_config")
def read_recon_config(dag_run=None):
    parent_dict = dag_run.conf
    d1 = {"name": "Santanu"}
    d2 = {"name": "Ghosh"}
    return [d1, d2]


@task(task_id="recon_rule_setup")
def recon_rule_setup(recon_conf):
    print(f"type of recon_conf_dict: {type(recon_conf)}")
    print(f"recon_conf_dict: {recon_conf}")
    return [recon_conf, {"name": "Kolkata"}]


@task(task_id="recon_rule_exec")
def recon_rule_exec(recon_rule, master_key):
    print(f"master_key type: {type(master_key)}")
    print(f"master_key: {master_key}")
    print(f"recon_rule type: {type(recon_rule)}")
    print(f"recon_rule: {recon_rule}")


default_args = {
    'owner': 'Airflow',
    'start_date': days_ago(1),
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1
}

dag_name_id = "dynamic_demo"
cur_datetime = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")[:-3]
master_dag_key = f"{dag_name_id}_{cur_datetime}"

with DAG(
        dag_id=dag_name_id,
        default_args=default_args,
        schedule_interval=None,
        catchup=False
) as dag:

    start = BashOperator(task_id="start", bash_command='echo "starting reconciliation"', do_xcom_push=False)
    stop = BashOperator(task_id="stop", bash_command='echo "stopping reconciliation"', do_xcom_push=False)

    delete_xcom = PythonOperator(
        task_id="delete_xcom",
        python_callable=clear_xcom_data
    )

    with TaskGroup(group_id="reconciliation_process") as tg1:
        recon_config_list = read_recon_config()
        recon_rule_list = recon_rule_setup.expand(recon_conf=recon_config_list)
        recon_rule_exec.partial(master_key=master_dag_key).expand(recon_rule=recon_rule_list)

    start >> tg1 >> delete_xcom >> stop

问候, 圣努

【问题讨论】:

    标签: python google-cloud-platform airflow google-cloud-composer airflow-2.x


    【解决方案1】:

    由于您试图将字典作为列表返回,这就是它在列表中返回字典的原因。根据您的要求,您可以尝试以下返回字典的代码。

    代码:

    from datetime import datetime
    from airflow.models import DAG, XCom
    from airflow.utils.dates import days_ago
    from airflow.decorators import task
    from airflow.utils.db import provide_session
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    from airflow.utils.task_group import TaskGroup
    
    
    @provide_session
    def clear_xcom_data(session=None, **kwargs):
        dag_instance = kwargs["dag"]
        dag_id = dag_instance._dag_id
        session.query(XCom).filter(XCom.dag_id == dag_id).delete()
    
    
    @task(task_id="read_recon_config")
    def read_recon_config(dag_run=None):
        parent_dict = dag_run.conf
        d1 = {1:{"name": "S"} ,2:{"name": "G"}}
        
        return d1
    
    
    @task(task_id="recon_rule_setup")
    def recon_rule_setup(recon_conf):
        print(f"type of recon_conf_dict: {type(recon_conf)}")
        print(f"recon_conf_dict: {recon_conf}")
        return [recon_conf, {"name": "Kolkata"}]
    
    
    @task(task_id="recon_rule_exec")
    def recon_rule_exec(recon_rule, master_key):
        print(f"master_key type: {type(master_key)}")
        print(f"master_key: {master_key}")
        print(f"recon_rule type: {type(recon_rule)}")
        print(f"recon_rule: {recon_rule}")
    
    
    default_args = {
        'owner': 'Airflow',
        'start_date': days_ago(1),
        'depends_on_past': False,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1
    }
    
    dag_name_id = "dynamic_demo"
    cur_datetime = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")[:-3]
    master_dag_key = f"{dag_name_id}_{cur_datetime}"
    
    with DAG(
            dag_id=dag_name_id,
            default_args=default_args,
            schedule_interval=None,
            catchup=False
    ) as dag:
    
        start = BashOperator(task_id="start", bash_command='echo "starting reconciliation"', do_xcom_push=False)
        stop = BashOperator(task_id="stop", bash_command='echo "stopping reconciliation"', do_xcom_push=False)
        
        delete_xcom = PythonOperator(
            task_id="delete_xcom",
            python_callable=clear_xcom_data
        )
       
    
        with TaskGroup(group_id="reconciliation_process") as tg1:
            recon_config_list = read_recon_config()
            recon_rule_list = recon_rule_setup.expand(recon_conf=recon_config_list)
            recon_rule_exec.partial(master_key=master_dag_key).expand(recon_rule=recon_rule_list)
    
        start >> tg1 >> delete_xcom >> stop
    

    输出:

    【讨论】:

      猜你喜欢
      • 2019-08-01
      • 1970-01-01
      • 2019-08-26
      • 1970-01-01
      • 2021-05-06
      • 2017-01-01
      • 2022-01-13
      • 2015-03-25
      • 1970-01-01
      相关资源
      最近更新 更多