【问题标题】:Create Unique file name and access that file in all airflow task创建唯一文件名并在所有气流任务中访问该文件
【发布时间】:2017-05-19 00:02:20
【问题描述】:

我们能否在每次运行气流 dag 时创建唯一的文件名并从所有任务中访问该文件? 我尝试创建全局变量(output_filename)并为其附加时间戳。 但是,当我在任务中访问该文件名时,每个任务都会生成不同的文件名,因为它正在计算每个任务中的时间戳。 下面是示例代码:

table_name = 'Test_ABC'
start_date = datetime.now()
cur_tmpstp = start_date.strftime('%Y_%m_%d')

output_filename = table_name + "_" + cur_tmpstp + ".csv"
S3_landing_path = "s3://abc/"

def clean_up():
    if os.path.exists(output_filename):
        os.remove(output_filename)


task_1 = BashOperator(
    task_id='task_1',
    bash_command="aws s3 cp %s %s/ " %(output_filename, S3_landing_path, ),
    dag=dag)

task_2_cleanup = PythonOperator(
    task_id='task_2_cleanup',
    python_callable=clean_up,
    dag=dag)

我们有更多任务需要访问 output_filename。 我们如何在所有任务中访问 output_filename 全局变量?

【问题讨论】:

    标签: airflow apache-airflow


    【解决方案1】:

    如果您只需要日期粒度的时间戳,那么您可以使用带有模板的默认变量。此类变量的一些示例(取自http://airflow.readthedocs.io/en/latest/code.html#default-variables)是

    {{ ds }}    the execution date as YYYY-MM-DD
    {{ ds_nodash }}     the execution date as YYYYMMDD
    {{ execution_date }}    the execution_date, (datetime.datetime)
    

    【讨论】:

    【解决方案2】:

    如果您需要时间粒度的时间戳,可以使用全局变量和带有python运算符的任务:

    DAG_NAME = 'Some DAG name'
    
    ts = Variable.get(f"{DAG_NAME}_ts", default_var=None)
    
    def generate_ts(*args, **kwargs):
        ts = datetime.now().isoformat()
        Variable.set(f"{DAG_NAME}_ts", ts)
    
    generate_ts_task = PythonOperator(
        task_id='generate_ts',
        python_callable=generate_ts,
        dag=dag,
    )
    

    【讨论】:

      猜你喜欢
      • 2020-04-28
      • 2017-09-28
      • 1970-01-01
      • 2011-10-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多