【问题标题】:Event based Triggering and running an airflow task on dropping a file into S3 bucket基于事件在将文件放入 S3 存储桶时触发和运行气流任务
【发布时间】:2020-02-29 20:27:36
【问题描述】:

是否可以仅在特定事件发生时运行气流任务,例如将文件放入特定 S3 存储桶的事件。类似于 AWS Lambda 事件的东西

S3KeySensor,但我不知道它是否符合我的要求(仅在事件发生时运行任务)

这里是让问题更清楚的例子:

我有一个传感器对象如下

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='my-sensor-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag
)

使用上述传感器对象,传感器任务的气流行为如下:

  • 如果已有对象名称匹配 S3 存储桶中的通配符 my-sensor-bucket 甚至在 DAG 之前 在气流管理 UI 中切换 ON(我不想运行任务,因为 到过去 s3 对象的存在)
  • 运行一次后,传感器任务将不再运行 是一个新的 S3 文件对象放置(我想在 DAG 中每次有一个新的 S3 文件对象放置在存储桶中时运行传感器任务和后续任务my-sensor-bucket
  • 如果您配置调度程序,任务将根据调度运行 但不是基于事件。所以调度器似乎不是这个选项 案例

我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,或者不能像基于事件的管道(类似于 AWS拉姆达)

【问题讨论】:

  • 你找到解决办法了吗?
  • @fortm 还没有。我决定研究 argo 工作流程。该项目看起来很有希望。它有 Argo 事件,这可能是这个问题中问题的解决方案。但问题是你必须加入 kubernetes

标签: amazon-s3 airflow-scheduler airflow


【解决方案1】:

气流基本上是围绕基于时间的调度来组织的。

您可以通过以下几种方式四处寻找您想要的东西:

  1. 假设您在 S3 上有一个 SQS 事件,它会触发一个调用气流 API 以触发 dag 运行的 AWS Lambda。
  2. 您可以使用 SQS 传感器启动 DAG,当它收到 s3 更改事件时,它会继续处理 DAG 的其余部分(有关重新调度,请参见 3_1 和 3_2)。
  3. 您可以使用传感器(如您展示的那个)启动 DAG,它不会选择要运行的任务,它只是传递到下一个相关任务或超时。您必须删除使传感器匹配的密钥。
    1. 您通过使最终任务重新触发 DAG 来重新运行。
    2. 或者将计划间隔设置为每分钟,不进行追赶,将最大活动 DAG 运行设置为 1。这样一次运行将处于活动状态,传感器将保持它直到超时。如果完成或超时,下一次运行将在一分钟内开始。

如果您使用路线 3,您将在 DAG 及其传感器的下一次运行之前删除通过传感器的键。请注意,由于 S3 的最终一致性,路由 1 和 2 更可靠。

【讨论】:

  • 这是否意味着 Airflow 不适用于处理这种基于事件的触发?我有一个用例,我需要每天触发约 1000 个相同 DAG 的实例(每当我收到新文件时),并且可以持续数天。 (由于缺少依赖项)
  • @MadJlzz 我想说这不是意味着。如果您编写一个触发其他 1000 个实例的主 dag,并且您有一个 S3 事件触发一个 lambda 函数,然后使用 API 触发该主 dag,您仍然会看到触发的其他 1000 个运行的一些调度延迟。也就是说,如果您使用带有智能传感器的 Airflow 2,那么只需让长时间运行的任务监听事件,您可能会获得更好的体验。 ...当 DAG 完成时,它会重新触发其启动传感器。有点不是 Airflow 最初的用途,但可能。
猜你喜欢
  • 2018-07-27
  • 2020-11-13
  • 1970-01-01
  • 2022-10-04
  • 2020-02-27
  • 1970-01-01
  • 2021-06-09
  • 1970-01-01
  • 2020-06-12
相关资源
最近更新 更多