【发布时间】:2017-03-13 11:06:26
【问题描述】:
我是气流新手,对 Airflow 及其处理器有疑问。 当一个处理器产生一个输出时,这个输出如何在输入中移动到下一个处理器? 有一个名为 nifi 的软件将中间输出存储到流文件中,afaik 在气流中没有这样的东西。 那么这是怎么发生的呢?
提前致谢。
【问题讨论】:
标签: airflow
我是气流新手,对 Airflow 及其处理器有疑问。 当一个处理器产生一个输出时,这个输出如何在输入中移动到下一个处理器? 有一个名为 nifi 的软件将中间输出存储到流文件中,afaik 在气流中没有这样的东西。 那么这是怎么发生的呢?
提前致谢。
【问题讨论】:
标签: airflow
Airflow 使用Xcoms 在算子之间传递数据。
如果流程是操作员 A -> 操作员 B,那么操作员 A 必须将一个值“推”到 xcom,如果操作员 B 想要读取该值,则必须从 A 中“拉取”这个值。
A 下游的任何操作员都可以通过以下方式访问 A 推送到 Xcom 的任何值:
value = context['task_instance'].xcom_pull(task_ids='operator_a', key='key_name')
操作员 A 会像这样推送这个值:
context['task_instance'].xcom_push(key_name,value,context['execution_date'])
【讨论】:
task_ids。例如 [a,b] >> c。如果我在做 data1 = context['task_instance'].xcom_pull(task_ids='a', key='key_name') 和 data2 = context['task_instance'].xcom_pull(task_ids='b', key='key_name ') 在c中,那么我需要了解依赖关系。有什么方法可以了解 c 中的 a,b 吗?
也许您指的是 GenericTransfer 运算符,它有助于在数据源之间移动数据?
https://github.com/apache/incubator-airflow/blob/master/airflow/operators/generic_transfer.py
【讨论】: