【发布时间】:2020-05-11 21:35:01
【问题描述】:
我正在尝试创建一个动态任务列表来检查当天的前一批运行是否完成。为了实现这一点,我将 Timings(HHMM) 存储在 Airflow 变量中,并使用 datetime.now() 变量来获取当前的 HHMM 并创建先前运行的列表。但随着 Airflow dag 每次都得到验证,它会获取最新的日期和时间,并基于此生成新的先前任务列表。
我试图查看而不是比较 datetime.now(),使用 {{ ds }} 和 {{ ts }} 默认气流变量来避免上述问题。但是它将这些变量视为字符串或无法将这些变量识别为变量并抛出未定义的 ts/ds 变量。
是否有方法/解决方法可以在操作符之外访问这些变量,因为上述逻辑是创建一个动态任务列表以根据检查先前的批处理运行完成情况运行。
提前致谢。
from datetime import datetime,timedelta,date
from pytz import timezone, utc
import pendulum
## Below would come from Airflow variable.
dag_times = ["0700", "0715", "0730" ,"0730", "0930","1130","1330","1630","2000"]
## This is the code to get the current time.. this is keep changing as the airflow validates the DAG.
current_dag_time = datetime.now().astimezone(timezone('US/Pacific')).strftime('%H%M')
schedule_run_time = min(dag_times, key=lambda x:abs(int(x)-int(current_dag_time)))
current_run = dag_times.index(schedule_run_time)
print("current_run",current_run)
intra_day_time = dag_times[dag_times.index(schedule_run_time)-1] if current_run > 0 else schedule_run_time
previous_runs = []
if current_run > 0:
# print(dag_times.index(schedule_run_time))
previous_runs = dag_times[0:dag_times.index(schedule_run_time)]
else:
previous_runs.append(dag_times[-1])
previous_run_tasks=[]
for dag_name in previous_runs:
item = {}
if int(dag_name) == 0:
if date.today().weekday() == 0 :
start_time =-52
end_time = 4
else:
start_time =-24
end_time = 24
# poke_task_name = "SAMPLE_BOX_%s" % dag_name
item = {"poke_task_name": "SAMPLE_BOX_%s" % dag_name, "start_time":start_time, "end_time":end_time}
elif int(dag_name) > 0 :
start_time =0
end_time = 24
poke_task_name = "SAMPLE_BOX_%s" % dag_name
item = {"poke_task_name": "SAMPLE_BOX_%s" % dag_name, "start_time":start_time, "end_time":end_time}
else:
print("error")
previous_run_tasks.append(item)
print(previous_run_tasks)
if int(schedule_run_time) == 0:
if date.today().weekday() == 0 :
start_time =-52
end_time = 4
else:
start_time =-24
end_time = 24
poke_task_name = "SAMPLE_BOX_%s" % dag_times[-1]
generate_task_name = "SAMPLE_BOX_%s" % schedule_run_time
elif int(schedule_run_time) > 0 :
start_time =0
end_time = 24
poke_task_name = "SAMPLE_BOX_%s" % intra_day_time
generate_task_name = "SAMPLE_BOX_%s" % schedule_run_time
else:
print("error")
print("start_time::::",start_time)
print("end_time::::",end_time)
print("generate_task_name::::",generate_task_name)
print("poke_task_name::::",poke_task_name)
【问题讨论】:
-
请提供一个带有 {{ ds }} 和 {{ ts }} 的示例,您打算如何运行它们?这将使我能够重现该问题。
标签: python airflow google-cloud-composer