【发布时间】:2020-02-29 20:27:36
【问题描述】:
是否可以仅在特定事件发生时运行气流任务,例如将文件放入特定 S3 存储桶的事件。类似于 AWS Lambda 事件的东西
有S3KeySensor,但我不知道它是否符合我的要求(仅在事件发生时运行任务)
这里是让问题更清楚的例子:
我有一个传感器对象如下
sensor = S3KeySensor(
task_id='run_on_every_file_drop',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='my-sensor-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag
)
使用上述传感器对象,传感器任务的气流行为如下:
- 如果已有对象名称匹配
S3 存储桶中的通配符
my-sensor-bucket甚至在 DAG 之前 在气流管理 UI 中切换ON(我不想运行任务,因为 到过去 s3 对象的存在) - 运行一次后,传感器任务将不再运行
是一个新的 S3 文件对象放置(我想在 DAG 中每次有一个新的 S3 文件对象放置在存储桶中时运行传感器任务和后续任务
my-sensor-bucket) - 如果您配置调度程序,任务将根据调度运行 但不是基于事件。所以调度器似乎不是这个选项 案例
我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,或者不能像基于事件的管道(类似于 AWS拉姆达)
【问题讨论】:
-
你找到解决办法了吗?
-
@fortm 还没有。我决定研究 argo 工作流程。该项目看起来很有希望。它有 Argo 事件,这可能是这个问题中问题的解决方案。但问题是你必须加入 kubernetes
标签: amazon-s3 airflow-scheduler airflow