更新: 稳定的 Airflow REST API 发布:
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
除了 API URL 更改之外,几乎所有内容都保持不变。
现在还需要“conf”作为对象,所以我添加了额外的包装:
def trigger_dag_v2(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/v1/dags/{}/dagRuns'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
"run_id": run_id,
"conf": {'conf': json.dumps(event)},
"execution_date": execution_date,
})
return data['message']
旧答案:
Airflow 具有 REST API(目前处于试验阶段) - 可在此处获得:
https://airflow.apache.org/api.html#endpoints
如果您不想按照其他答案中的建议安装插件 - 以下是如何直接使用 API 进行安装的代码:
def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
})
return data['message']
更多在 python 中使用气流 API 的示例可在此处获得:
https://github.com/apache/airflow/blob/master/airflow/api/client/json_client.py