【问题标题】:Airflow and data transfer between operators运营商之间的气流和数据传输
【发布时间】:2017-03-13 11:06:26
【问题描述】:

我是气流新手,对 Airflow 及其处理器有疑问。 当一个处理器产生一个输出时,这个输出如何在输入中移动到下一个处理器? 有一个名为 nifi 的软件将中间输出存储到流文件中,afaik 在气流中没有这样的东西。 那么这是怎么发生的呢?

提前致谢。

【问题讨论】:

    标签: airflow


    【解决方案1】:

    Airflow 使用Xcoms 在算子之间传递数据。

    如果流程是操作员 A -> 操作员 B,那么操作员 A 必须将一个值“推”到 xcom,如果操作员 B 想要读取该值,则必须从 A 中“拉取”这个值。

    A 下游的任何操作员都可以通过以下方式访问 A 推送到 Xcom 的任何值:

    value = context['task_instance'].xcom_pull(task_ids='operator_a', key='key_name') 
    

    操作员 A 会像这样推送这个值:

    context['task_instance'].xcom_push(key_name,value,context['execution_date'])
    

    【讨论】:

    • 另一个(简单的?)问题:xcom 在哪里存储数据?
    • 在气流 DB xcom 表中
    • 是否可以将查询结果共享给其他任务,如果可以,我该怎么做?在哪里可以找到更多这样的例子,我觉得框架有点乱
    • @AlbertoBonsanto 考虑将此问题的更详细版本作为新问题发布。我认为答案是“是”,但需要更多信息才能提供一个好的答案。
    • 有没有办法知道当前任务所依赖的task_ids。例如 [a,b] >> c。如果我在做 data1 = context['task_instance'].xcom_pull(task_ids='a', key='key_name') 和 data2 = context['task_instance'].xcom_pull(task_ids='b', key='key_name ') 在c中,那么我需要了解依赖关系。有什么方法可以了解 c 中的 a,b 吗?
    【解决方案2】:

    也许您指的是 GenericTransfer 运算符,它有助于在数据源之间移动数据?

    https://github.com/apache/incubator-airflow/blob/master/airflow/operators/generic_transfer.py

    【讨论】:

    • 我认为他们在问如何将数据从一个操作员传递到另一个操作员,而不是如何通过操作员从一个位置获取数据到另一个位置。
    猜你喜欢
    • 2023-01-23
    • 2018-06-07
    • 2021-06-27
    • 2018-08-17
    • 1970-01-01
    • 1970-01-01
    • 2015-05-28
    • 2023-01-24
    • 1970-01-01
    相关资源
    最近更新 更多