【问题标题】:Is there any way to specify a block of code that should run at the start of every Airflow operator?有没有办法指定应该在每个 Airflow 操作符开始时运行的代码块?
【发布时间】:2020-09-24 17:23:16
【问题描述】:

我们有一些设置代码要确保在每个操作符开始时运行。

我们现在能做的最好的事情是创建一个所有其他运算符都继承自的基本运算符,并从该运算符的预执行函数中调用设置代码。但是,我们无法强制开发人员编写的所有未来运算符都必须从该运算符继承,因此人们仍然可以编写不运行重要设置代码的运算符。

有没有办法制作一个对所有操作员都通用的预执行函数?

【问题讨论】:

    标签: python airflow initializer


    【解决方案1】:

    我总结一下,您有 2 个选择。选项 1 稍微实用一些,技术性更强,需要补丁,但有很多好处。选项 2 是实际扩展您的第一个建议。


    选项 1

    找到安装airflow BaseOperator的文件源。复制它并修改文件,使pre_execute 方法的定义具有您的功能。使用diff 生成一个diff 文件,您可以使用patch 文件应用该文件。拨打OVERRIDE-apply-library-preexecute.diff

    然后,您需要在安装或配置步骤中加载此差异文件。正常安装气流。然后您可以使用patch 工具。类似patch -d <PATH> -p1 < OVERRIDE-apply-library-preexecute.diff 的命令

    这具有最少的代码量,会很明显地失败(随着事情的变化),并且您不必进行单元测试。并且保证每个 Operator 都会执行您的 pre_execute 方法。


    选项 2

    我建议您正在寻找正确的方向。

    我建议将您的代码简单地实现为 Mixin,提供在您自己的代码中使用的所有运算符,并使用单元测试来确保 DAG 使用直接继承该 Mixin 的任务。

    您可以按照以下方式定义您的 Mixin 类:

    import logging
    
    from airflow.models import BaseOperator
    
    class MyPreExecuteMixin(BaseOperator):
        def pre_execute(self, context):
            logging.info("Global Pre Execute")
    

    这将迫使您提供要在自己的库中使用的每个 Operator 的版本,就像这样...

    from airflow.operators.dummy_operator import DummyOperator as _DummyOperator
    from mylibrary.models import MyPreExecuteMixin
    
    class DummyOperator(_DummyOperator, MyPreExecuteMixin):
        pass
    

    然后你必须有一个类似于...的测试套件

    from dags.mydag import dag
    
    def test_all_pre_execute():
        non_pre_execute_tasks = [task for task in dag.tasks if type(task) != MyPreExecuteMixin]
        assert not non_pre_execute_tasks
    

    【讨论】:

    • 我看到你可以更明确地将你的单元测试写成:``` assert all([type(task) == MyPreExecuteMixin for task in dag.tasks]) ``
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-26
    • 2022-11-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多