【发布时间】:2017-05-19 00:02:20
【问题描述】:
我们能否在每次运行气流 dag 时创建唯一的文件名并从所有任务中访问该文件? 我尝试创建全局变量(output_filename)并为其附加时间戳。 但是,当我在任务中访问该文件名时,每个任务都会生成不同的文件名,因为它正在计算每个任务中的时间戳。 下面是示例代码:
table_name = 'Test_ABC'
start_date = datetime.now()
cur_tmpstp = start_date.strftime('%Y_%m_%d')
output_filename = table_name + "_" + cur_tmpstp + ".csv"
S3_landing_path = "s3://abc/"
def clean_up():
if os.path.exists(output_filename):
os.remove(output_filename)
task_1 = BashOperator(
task_id='task_1',
bash_command="aws s3 cp %s %s/ " %(output_filename, S3_landing_path, ),
dag=dag)
task_2_cleanup = PythonOperator(
task_id='task_2_cleanup',
python_callable=clean_up,
dag=dag)
我们有更多任务需要访问 output_filename。 我们如何在所有任务中访问 output_filename 全局变量?
【问题讨论】: