【发布时间】:2021-09-28 22:40:55
【问题描述】:
我正在尝试在自定义传感器运算符(即 BaseSensorOperator 的子类)上使用 Airflow 的 Smart Sensors feature。目前关于此功能的文档非常少。
分片作业 (smart_sensor_group_shard_[x]) 正在运行,但我不认为他们正在接收我的传感器。那些日志说Loaded 0 sensor_works。
我认为问题在于 BaseSensorOperator.is_smart_sensor_compatible() 正在返回 False,即使我在配置中打开了该功能。这是我的配置:
[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True
但是这里有来自MySensorOperator的日志:
INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible
如您所见,操作员仍会看到 Airflow 对这些配置值的默认设置。我不知道为什么会出现这种不一致,因为我可以在 UI 中看到配置设置正确。
我的其余代码
来自 MyCustomSensor 的相关代码:
class MyCustomSensor(BaseSensorOperator):
poke_context_fields = ['some_arg', 'use_smart_sensor']
def __init__(self, some_arg,
use_smart_sensor=False,
*args, **kwargs):
self.some_arg = some_arg
self.use_smart_sensor = use_smart_sensor
super(MyCustomSensor, self).__init__(*args, **kwargs)
def is_smart_sensor_compatible(self):
# If we have turned it off.
if not self.use_smart_sensor:
is_compatible = False
else:
self.soft_fail = False
# super() should be BaseSensorOperator
is_compatible = super().is_smart_sensor_compatible()
log.info(f'{self.sensor_service_enabled=}')
log.info(f'{self.sensors_support_sensor_service=}')
if is_compatible:
log.info('Sensor IS Smart Sensor compatible')
else:
log.info('Sensor is NOT Smart Sensor compatible')
return is_compatible
我如何创建传感器任务:
# NOTE: I think that poke_interval may
# be ignored when we are using Smart
# Sensors.
my_sensor = MyCustomSensor(
task_id='some_name',
prior_task='some_other_name',
timeout=518400,
mode='reschedule',
poke_interval=30,
use_smart_sensor=True,
dag=dag
)
我正在使用 Cloud Composer,特别是版本 composer-1.17.1-airflow-2.1.2。我已验证这些不是 Cloud Composer 的阻止配置。
【问题讨论】:
-
我相信你应该使用
reschedule模式而不是戳来让它工作。 - 我认为您还应该记录“is_smart_sensor_compatible()”结果以查看它是否按预期工作 -
@JarekPotiuk 我尝试按照建议将模式更改为
reschedule,但没有帮助。我还添加了日志记录。但我想我已经缩小了问题的范围,即BaseSensorOperator.is_smart_sensor_compatible()没有看到智能传感器配置。我已经重写了我的问题以反映这些新信息。感谢您提出任何其他想法。 -
对于以后发现此问题的任何人:正如我在对已接受答案的编辑中提到的那样,从 Airflow 2.2 开始,Deferrable Operators 功能现在比智能传感器更受欢迎。
标签: python airflow google-cloud-composer