【问题标题】:Cannot turn on Airflow's Smart Sensors feature (custom sensor)无法打开 Airflow 的智能传感器功能(自定义传感器)
【发布时间】:2021-09-28 22:40:55
【问题描述】:

我正在尝试在自定义传感器运算符(即 BaseSensorOperator 的子类)上使用 Airflow 的 Smart Sensors feature。目前关于此功能的文档非常少。

分片作业 (smart_sensor_group_shard_[x]) 正在运行,但我不认为他们正在接收我的传感器。那些日志说Loaded 0 sensor_works

我认为问题在于 BaseSensorOperator.is_smart_sensor_compatible() 正在返回 False,即使我在配置中打开了该功能。这是我的配置:

[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True

但是这里有来自MySensorOperator的日志:

INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible

如您所见,操作员仍会看到 Airflow 对这些配置值的默认设置。我不知道为什么会出现这种不一致,因为我可以在 UI 中看到配置设置正确。

我的其余代码

来自 MyCustomSensor 的相关代码:

class MyCustomSensor(BaseSensorOperator):
    poke_context_fields = ['some_arg', 'use_smart_sensor']

    def __init__(self, some_arg,
                 use_smart_sensor=False,
                 *args, **kwargs):
        self.some_arg = some_arg
        self.use_smart_sensor = use_smart_sensor
        super(MyCustomSensor, self).__init__(*args, **kwargs)
    
    def is_smart_sensor_compatible(self):

        # If we have turned it off.
        if not self.use_smart_sensor:
            is_compatible = False
        else:
            self.soft_fail = False

            # super() should be BaseSensorOperator
            is_compatible = super().is_smart_sensor_compatible()

        log.info(f'{self.sensor_service_enabled=}')
        log.info(f'{self.sensors_support_sensor_service=}')

        if is_compatible:
            log.info('Sensor IS Smart Sensor compatible')
        else:
            log.info('Sensor is NOT Smart Sensor compatible')
        return is_compatible

我如何创建传感器任务:

# NOTE: I think that poke_interval may
#   be ignored when we are using Smart 
#   Sensors.
my_sensor = MyCustomSensor(
    task_id='some_name',
    prior_task='some_other_name',
    timeout=518400,
    mode='reschedule',
    poke_interval=30,
    use_smart_sensor=True,
    dag=dag
)

我正在使用 Cloud Composer,特别是版本 composer-1.17.1-airflow-2.1.2。我已验证这些不是 Cloud Composer 的阻止配置。

【问题讨论】:

  • 我相信你应该使用reschedule 模式而不是戳来让它工作。 - 我认为您还应该记录“is_smart_sensor_compatible()”结果以查看它是否按预期工作
  • @JarekPotiuk 我尝试按照建议将模式更改为reschedule,但没有帮助。我还添加了日志记录。但我想我已经缩小了问题的范围,即BaseSensorOperator.is_smart_sensor_compatible() 没有看到智能传感器配置。我已经重写了我的问题以反映这些新信息。感谢您提出任何其他想法。
  • 对于以后发现此问题的任何人:正如我在对已接受答案的编辑中提到的那样,从 Airflow 2.2 开始,Deferrable Operators 功能现在比智能传感器更受欢迎。

标签: python airflow google-cloud-composer


【解决方案1】:

当我在现有 Cloud Composer 实例上使用命令 gcloud composer environments update 更新 sensors_enableduse_smart_sensor 的值时,我能够使用您的代码重现您的错误。 Cloud Composer 似乎没有在运行时应用新配置。

但我能够找到解决方法。请参阅以下步骤:

  1. 我创建了一个新的 composer 实例,并且在创建页面上我已经定义了 Airflow 配置覆盖。

  1. 我使用您的 DAG 进行了测试,并且正确应用了 Airflow 配置。

我创建了一个 public issue tracker 来向 Cloud Composer 工程团队报告此问题。

更新:

另一种解决方法是设置一个虚拟环境变量以强制工作人员重新启动并将更改应用于 Airflow 配置。

gcloud composer environments update <composer_env_name> \
--location <location> \
--update-env-variables=DUMMY=dummy

更新 2:

从 Airflow 2.2 开始,Deferrable Operators 比智能传感器更适合作为解决方案。您应该先查看该功能。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-04
    • 1970-01-01
    • 2023-04-01
    • 1970-01-01
    相关资源
    最近更新 更多