【问题标题】:Importing custom plugins in Airflow 2 [Cloud Composer]在 Airflow 2 [Cloud Composer] 中导入自定义插件
【发布时间】:2021-12-05 11:33:13
【问题描述】:

我有一个这样的目录结构:

airflow_dags
├── dags
│   └── hk  
│       └── hk_dag.py  
├── plugins
│   └── cse   
│       └── operators.py   
│           └── cse_to_bq.py   
└── test
   └── dags   
       └── dag_test.py  

在 Cloud Composer 创建的 GCS 存储桶中,有一个插件文件夹,我在其中上传了 cse 文件夹。

如果我像这样导入插件,现在在我的 hk_dag.py 文件中:

from plugins.cse.operators.cse_to_bq import CSEToBQOperator

并运行我的单元测试,它通过了,但在云作曲家中我收到了 ModuleNotFoundError: No module named 'plugins' 错误消息。

如果我在我的hk_dag.py 中导入这样的插件:

from cse.operators.cse_to_bq import CSEToBQOperator

我的单元测试因ModuleNotFoundError: No module named 'cse' 而失败,但它在 Cloud Composer 中运行良好。

我该如何解决?

【问题讨论】:

    标签: python airflow google-cloud-composer


    【解决方案1】:

    在 Airflow 2.0 中,要导入插件,您只需要直接从操作员模块中导入即可。

    在你的情况下,必须是这样的:

    from operators.cse_to_bq import CSEToBQOperator
    

    但在此之前,您必须将文件夹结构更改为:

    airflow_dags
    ├── dags
    │   └── hk  
    │       └── hk_dag.py  
    ├── plugins
    │   └── operators   
    │       └── cse   
    │           └── cse_to_bq.py 
    └── test
       └── dags   
           └── dag_test.py 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-12-27
      • 2021-08-19
      • 2020-07-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-10
      相关资源
      最近更新 更多