【问题标题】:Running Job On Airflow Based On Webrequest基于 Webrequest 在 Airflow 上运行作业
【发布时间】:2017-02-22 22:47:26
【问题描述】:

我想知道气流任务是否可以在通过 HTTP 获得请求时执行。我对 Airflow 的调度部分不感兴趣。我只是想用它来代替芹菜。

所以一个示例操作是这样的。

  1. 用户提交表单以请求某些报告。
  2. 后端收到请求并向用户发送请求已收到的通知。
  3. 然后后端使用 Airflow 安排作业立即运行。
  4. Airflow 然后执行一系列与 DAG 关联的任务。比如先从redshift拉取数据,从MySQL拉取数据,对两个结果集做一些操作,合并,然后将结果上传到Amazon S3,发邮件。

根据我在网上阅读的内容,您可以通过在命令行上执行 airflow ... 来运行气流作业。我想知道是否有一个 python api 可以执行同样的事情。

谢谢。

【问题讨论】:

  • 嗨!你到底有什么解决方案?我有点面临同样的问题
  • 我最终没有使用 AirFlow,因为我想不出解决方案。我最终改用 Celery。
  • 这很有趣,你每小时有多少个电话?我觉得如果数量多的话,用airflow可能不好

标签: python airflow


【解决方案1】:

Airflow REST API Plugin 会在这里为您提供帮助。按照说明安装插件后,您只需点击以下 url:http://{HOST}:{PORT}/admin/rest_api/api/v1.0/trigger_dag?dag_id={dag_id}&run_id={run_id}&conf={url_encoded_json_parameters},将 dag_id 替换为 dag 的 id,省略 run_id 或指定唯一 id,并为 conf 传递 url 编码的 json (使用触发的 dag 中需要的任何参数)。

这是一个使用 jQuery 调用 Airflow api 的示例 JavaScript 函数:

function triggerDag(dagId, dagParameters){
    var urlEncodedParameters = encodeURIComponent(dagParameters);
    var dagRunUrl = "http://airflow:8080/admin/rest_api/api/v1.0/trigger_dag?dag_id="+dagId+"&conf="+urlEncodedParameters;
    $.ajax({
        url: dagRunUrl,
        dataType: "json",
        success: function(msg) {
            console.log('Successfully started the dag');
        },
        error: function(e){
           console.log('Failed to start the dag');
        }
    });
}

【讨论】:

  • 太棒了,正是我需要的!我期待着进入这个框架!
  • 这在当时是正确的,但我认为您现在应该提及实验性 API。
【解决方案2】:

气流中的一个新选项是 实验性,但在最近的 1.7 和 1.8 版本中是内置的 API 端点。这允许您在气流服务器上运行 REST 服务来监听端口并接受 cli 作业。

我自己的经验有限,但我已经成功地运行了测试。根据文档:

/api/experimental/dags/<DAG_ID>/dag_runs 为给定的 dag id (POST) 创建一个 dag_run。

这将安排立即运行您想要运行的任何 dag。但是,它仍然使用调度程序,等待心跳以查看 dag 正在运行并将任务传递给工作人员。不过,这与 CLI 的行为完全相同,因此我仍然相信它适合您的用例。

有关如何配置它的文档可在此处获得:https://airflow.apache.org/api.html

github 中也有一些简单的示例客户端,在airflow/api/clients 下

【讨论】:

  • 在最新版本中,他们将airflow.cfg 中的auth_backend 变量从默认值更改为deny_all。将其更改为默认值以进行测试。
【解决方案3】:

您应该查看Airflow HTTP Sensor 以满足您的需要。您可以使用它来触发 dag。

【讨论】:

  • github的链接响应通知:404 Not Found
【解决方案4】:

Airflow 的实验性 REST API 接口可用于此目的。

以下请求将触发 DAG:

curl -X POST \
    http://<HOST>:8080/api/experimental/dags/process_data/dag_runs \
    -H 'Cache-Control: no-cache' \
    -H 'Content-Type: application/json' \
    -d '{"conf":"{\"START_DATE\":\"2018-06-01 03:00:00\", \"STOP_DATE\":\"2018-06-01 23:00:00\"}'

以下请求检索特定 DAG ID 的 Dag Runs 列表:

curl -i -H "Accept: application/json" -H "Content-Type: application/json" -X GET http://<HOST>:8080/api/experimental/dags/process_data/dag_runs

要使 GET API 工作,请将 rbac 标志设置为 True airflow.cfg

以下是可用 API 列表:here & there

【讨论】:

    【解决方案5】:

    更新: 稳定的 Airflow REST API 发布: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    除了 API URL 更改之外,几乎所有内容都保持不变。 现在还需要“conf”作为对象,所以我添加了额外的包装:

     def trigger_dag_v2(self, dag_id, run_id=None, conf=None, execution_date=None):
        endpoint = '/api/v1/dags/{}/dagRuns'.format(dag_id)
        url = urljoin(self._api_base_url, endpoint)
        data = self._request(url, method='POST',
                             json={
                                 "run_id": run_id,
                                 "conf": {'conf': json.dumps(event)},
                                 "execution_date": execution_date,
                             })
        return data['message']
    

    旧答案:

    Airflow 具有 REST API(目前处于试验阶段) - 可在此处获得: https://airflow.apache.org/api.html#endpoints

    如果您不想按照其他答案中的建议安装插件 - 以下是如何直接使用 API 进行安装的代码:

    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
        endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
        url = urljoin(self._api_base_url, endpoint)
        data = self._request(url, method='POST',
                             json={
                                 "run_id": run_id,
                                 "conf": conf,
                                 "execution_date": execution_date,
                             })
        return data['message']
    

    更多在 python 中使用气流 API 的示例可在此处获得: https://github.com/apache/airflow/blob/master/airflow/api/client/json_client.py

    【讨论】:

      【解决方案6】:

      我在尝试做同样的事情时发现了这篇文章,经过进一步调查,我切换到 ArgoEvents。它基本相同,但基于事件驱动的流程,因此更适合此用例。 关联: https://argoproj.github.io/argo

      【讨论】:

      • 您的答案读作广告,并未解决 OP 所述的问题。 “更合适”不涵盖您答案中信息的有效负载,因此请详细说明,例如差异或捷径..在回答提出的问题时想到的很接近。否则它只是一个评论,如果不是广告。审查结束。完成SO之旅!欢迎并享受 SO ;-)
      【解决方案7】:

      Airflow 现在支持stable REST API。使用稳定的 REST API,您可以触发 DAG:

      curl --location --request POST 'localhost:8080/api/v1/dags/unpublished/dagRuns' \
      --header 'Content-Type: application/json' \
      --header 'Authorization: Basic YWRtaW46YWRtaW4=' \
      --data-raw '{
          "dag_run_id": "dag_run_1",
          "conf": {
              "key": "value"
          }
      }'
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-04-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-10-18
        • 1970-01-01
        • 2021-12-08
        • 2011-07-15
        相关资源
        最近更新 更多