【问题标题】:Run Airflow tasks in a loop until completion循环运行 Airflow 任务直到完成
【发布时间】:2021-04-03 05:12:02
【问题描述】:

我正在构建一个基本的 ETL 管道,该管道到达一个主端点,该端点包含一个用于处理的 ID 列表(每次调用的可变数量)。我目前的想法是使用 RabbitMQ 作为队列系统,并从 RabbitMQ 消耗三个任务(提取、转换、加载)。我在网上看到的大多数教程都展示了任务在退出之前的简单顺序执行。我尝试构建一个 DAG,对我们收到的每个 ID 执行此顺序操作。但是当我不知道存在多少个 ID 时,我在试图弄清楚如何通过气流安排所有这些任务时遇到了问题。

这是相关 DAG 的一般树视图:

我已经开始使用 RabbitMQ 将这些 ID 推送到队列中,并让 celery 将可变数量的工作人员用于处理负载。我遇到的问题是我不知道如何打破“消费”循环。例如(我使用伪代码对 RabbitMQ 进行一些抽象):

def extract():
    # callback function when messages are sent to this worker
    def _extract(channel, url, rest):
        resp = request.get(url)
        channel.publish('transform_queue', resp)

    # attach the callback to the queue
    channel.basic_consume('extract_queue', callback=_extract)
    channel.start_consuming() # runs a pseudo loop waiting for messages here

请注意,一些变量(例如_extract 下的通道是隐式的,但很可能会包装在自定义运算符中。

Load 和 Transform 函数的工作方式类似。我遇到的问题是当函数开始使用它时它不会停止,直到它关闭。我已经能够发送哨兵消息以允许该功能“退出”,但这将导致任务被标记为失败,并被发送重试。例如这里是哨兵关闭的代码。

def extract():
    # callback function when messages are sent to this worker
    def _extract(channel, message, rest):
        if message == SHUTDOWN:
            exit()
        
        resp = requests.get(message.url)
        channel.publish('transform_queue', resp)

    # attach the callback to the queue
    channel.basic_consume('extract_queue', callback=_extract)
    channel.start_consuming() # runs a pseudo loop waiting for messages here

还有选择性地取消消费者的选项,但这只会增加更多复杂性,因为仍然存在轮询取消的问题,然后该任务最终会遇到上述相同的问题。

我的主要问题是:

有没有办法在这个设置中成功退出?

这是解决此问题的最佳方法吗?我想这是气流的常见用例,因此必须有一些最佳实践或常见设置。但是,我一直没能找到它。

【问题讨论】:

    标签: python rabbitmq celery airflow etl


    【解决方案1】:

    我可以从您的问题中理解以下内容,因此建议您探索以下内容。

    您不确定输入的数量,因此不确定要运行流程的次数

    您可以创建一个自定义运算符(例如 FindIDs),它首先找出您需要执行多少个 ID,然后将值推送到 XComs。然后这些消息可以用于您的其他功能(例如提取器、转换器和加载器),并且可以按如下顺序设置它们

    start >> findIds >> extractor >> transformer >> loader >> end
    

    查看https://airflow.apache.org/docs/apache-airflow/stable/concepts.html?#xcoms

    如果没有输入(或您的情况下的 ID),您需要跳过某些执行

    在这种情况下,我会使用 ShortCircuitOperator 并有条件地跳过 DAG 的执行。 检查:https://github.com/apache/airflow/blob/master/airflow/example_dags/example_short_circuit_operator.py

    【讨论】:

    • 我知道 XCom 的,但我认为这不是我在这种情况下要寻找的。​​span>
    猜你喜欢
    • 1970-01-01
    • 2018-04-10
    • 2021-08-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-30
    • 2019-11-20
    • 2019-06-19
    • 1970-01-01
    相关资源
    最近更新 更多