【发布时间】:2021-03-06 18:43:33
【问题描述】:
我正在从外部文件中读取元素列表并循环遍历元素以创建一系列任务。
例如,如果文件中有 2 个元素 - [A, B]。将有 2 个系列的任务:
A1 -> A2 ..
B1 -> B2 ...
这个读取元素逻辑不是任何任务的一部分,而是在 DAG 本身中。因此,Scheduler 每天在读取 DAG 文件时调用它多次。我只想在 DAG 运行时调用它。
想知道此类用例是否已有模式?
【问题讨论】:
我正在从外部文件中读取元素列表并循环遍历元素以创建一系列任务。
例如,如果文件中有 2 个元素 - [A, B]。将有 2 个系列的任务:
A1 -> A2 ..
B1 -> B2 ...
这个读取元素逻辑不是任何任务的一部分,而是在 DAG 本身中。因此,Scheduler 每天在读取 DAG 文件时调用它多次。我只想在 DAG 运行时调用它。
想知道此类用例是否已有模式?
【问题讨论】:
根据您的要求,如果您要避免多次读取文件,但不介意多次从元数据数据库中读取,那么您可以更改您的方法以使用Variables作为迭代源动态创建任务。
一个基本示例可能是在PythonOperator 中执行文件读取并设置您稍后将用于迭代的Variables(相同的可调用对象):
sample_file.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
任务定义:
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
Variable.set(key='list_of_cities',
value=data['cities'], serialize_json=True)
print('Loading Variable from file...')
def _say_hello(city_name):
print('hello from ' + city_name)
with DAG('dynamic_tasks_from_var', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=_read_file
)
然后您可以从该变量中读取并创建动态任务。 (设置default_var 很重要)。 TaskGroup 是可选的。
# Top-level code
updated_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
print(f'Updated LIST: {updated_list}')
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
for index, city in enumerate(updated_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_say_hello,
op_kwargs={'city_name': city}
)
# DAG level dependencies
read_file >> dynamic_tasks_group
在 Scheduler 日志中,您只会发现:
INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
Dag 图表视图:
使用这种方法,top-level code,因此被 Scheduler 连续读取,是对 Variable.get() 方法的调用。如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(例如 article)。
【讨论】: