【发布时间】:2021-10-15 08:21:26
【问题描述】:
参考this,气流传感器允许我们在运行下一个任务之前检查标准。有没有办法在用户设置超时和另一个标志的情况下标记成功终止传感器?
在我的用例中,我必须通过传感器检查条件,但只能在我希望 DAG/follows 任务正常运行的特定时间范围内。
【问题讨论】:
标签: airflow airflow-scheduler airflow-2.x
参考this,气流传感器允许我们在运行下一个任务之前检查标准。有没有办法在用户设置超时和另一个标志的情况下标记成功终止传感器?
在我的用例中,我必须通过传感器检查条件,但只能在我希望 DAG/follows 任务正常运行的特定时间范围内。
【问题讨论】:
标签: airflow airflow-scheduler airflow-2.x
您可以通过创建自定义传感器类来做到这一点。您将需要覆盖poke 函数并放置您希望设置的逻辑。
例如:
from airflow.sensors.sql import SqlSensor
class MySqlSensor(SqlSensor):
def is_time_frame(self):
# TODO: implement a function that returns True if we want to ignore the sensor
def poke(self, context):
if self.is_time_frame():
return True
super().poke(context)
在此示例中,当传感器戳它时,首先检查时间窗口。如果当前时间在窗口内,则传感器将返回 True 并退出。对于任何其他情况,传感器都会发挥作用 - 在该特定示例中,运行 SQL 查询直到查询返回 True。
【讨论】: