【问题标题】:Airflow Scheduler creating PID for same dag to generate tasks every timeAirflow Scheduler 为同一个 dag 创建 PID 以每次生成任务
【发布时间】:2018-05-09 00:53:28
【问题描述】:

我正在使用本地执行器。我有一种情况,我为每个请求 id 生成了唯一的 dag,例如 1.py , 2.py 。

1.py 假设有两个任务,2.py 有 3 个任务。我还会定期收到更多的 dag,例如 3.py、4.py 等。

为每个新的 id/request ID 创建一个 dag 是否有任何问题。

我观察到调度程序不断给出这个日志。

Started a process (PID: 92186) to generate tasks for /Users/nshar141/airflow/dags/3.py - logging into /Users/nshar141/airflow/logs/scheduler/2018-05-07/3.py.log

我的问题是为什么调度程序不断生成单独的 PID 来生成任务。我尝试在配置中更改与并发和并行性相关的不同参数,但调度程序似乎每次都为 dags 文件夹中存在的每个 dag 执行该语句。

我附上我的 dag 定义。我想在创建 dag 后立即运行它。我应该在 start_time 和 scheduler_interval 中给出什么参数?

dag = DAG('3', description='Sample DAG',schedule_interval=@once,start_date=datetime(2018, 5, 07), catchup=False)

由于我需要使用唯一的 dag id 动态生成 dag 并将其放置在 dags 文件夹中,因此我担心调度程序会为已执行的文件夹中的每个 dag 生成过多的进程 IDS。

【问题讨论】:

    标签: python airflow airflow-scheduler


    【解决方案1】:

    为什么要为每个请求创建一个新的 DAG?我认为最合适的方法是存储请求并让单个 DAG 以批处理方式同时执行多个请求的逻辑。如果你愿意,你可以经常运行你的 DAG。

    您似乎希望尽快执行任务。如果您对具有大量吞吐量的近实时感兴趣,Airflow 可能不合适,您可能希望使用消息队列。

    【讨论】:

    • 我想为每个请求分配唯一的 DAG id,以便在气流 UI 中,如果出现故障,我可以隔离每个请求。但是,如果我只有一个 dag 可以为不同的请求批量执行操作,那么我将拥有与单个 dag 相关联的大量任务,并且很难跟踪和跟踪。我正在尝试类似stackoverflow.com/questions/39133376/… 但问题是我如何限制调度程序不要在每次心跳时扫描我的 master.py,而是每 1 分钟扫描一次。(这样一个 id 分配给一个 dag,没有竞争条件)跨度>
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多