【问题标题】:Inherit from a Custom Operator in Airflow从 Airflow 中的自定义运算符继承
【发布时间】:2022-01-17 10:01:21
【问题描述】:

我正在尝试创建一个继承自 Airflow BaseOperator 的 CustomBaseOperator。 CustomBaseOperator 工作正常,但是当我尝试创建继承自 CustomBaseOperator 的 ChildOperator 时,Airflow 将其视为 CustomBaseOperator。

CustomBaseOperator 有一个如下所示的执行函数:

def make_request(self): 
    print("Parent request")

def execute(self, context): 

    self.make_request()

在 Child Operator 中,我重新定义了 make_request:



class ChildOperator(CustomBaseOperator): 
    
    @apply_defaults
    

    def make_request(self): 
        print("Child Request")

每当我运行使用 ChildOperator 的任务时,它都会打印“父请求”,并且图例将其显示为 CustomBaseOperator ... 我的操作员位于“插件”文件夹中的“操作员”文件夹中。我猜在该文件夹中创建自定义运算符时,我只能从“官方”运算符继承。 你知道我怎样才能使继承工作吗?

【问题讨论】:

  • 请显示完整的课程代码。 CustomBaseOperator 和 ChildOperator。你写的应该有效,所以其他事情正在发生

标签: python inheritance airflow


【解决方案1】:

这行得通:

import datetime

from airflow import DAG
from airflow.models import BaseOperator


class CustomBaseOperator(BaseOperator):
    def make_request(self):
        print("Parent request")

    def execute(self, context):
        self.make_request()


class ChildOperator(CustomBaseOperator):
    def make_request(self):
        print("Child Request")


with DAG(dag_id="test_dag", start_date=datetime.datetime(2022, 1, 1), schedule_interval=None) as dag:
    test = ChildOperator(task_id="test")

这将在 Airflow UI 中打印 Child Request 并显示 ChildOperator。另外,请注意设置 @apply_defaults 自 Airflow 2.0 以来已弃用,现在它会自动应用。

【讨论】:

    【解决方案2】:

    我发现了问题:我正在使用一个循环到 locals() 的函数来为每个参数设置 self.param = param。

    【讨论】:

    • 您能否分享导致您描述的错误的特定代码迭代 locals() 的 sn-p?
    • 是的,就是这个功能:def auto_self_params(args: dict): """Sets automatically the parameters in __init__ class with locals() on argument""" theobj = args.pop("self", None) for key, val in args.items(): setattr(theobj, key, val)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-11-19
    • 2023-02-15
    • 1970-01-01
    • 2015-06-27
    • 2011-07-08
    • 2023-04-02
    • 2020-04-07
    相关资源
    最近更新 更多