【发布时间】:2022-07-19 22:40:38
【问题描述】:
【问题讨论】:
【问题讨论】:
您可以定义一个callback 函数,然后您可以将其作为默认参数传递给DAG() 运算符。
def on_failure_callback_slack(context):
message = f"Failue! airflow task: {context['task_instance'].task_id} failed" \
f"dag: {base_url}?dag_id={context['dag'].dag_id} " \
f"{str(context['task_instance'])}"
operator = PythonOperator(task_id="failure", python_callable=post_to_slack, op_kwargs={'message': message}
return operator.execute(context=context)
在上面的代码中,post_to_slack() 只是一个使用requests.post(...) 发布到 slack 的实用函数
您可以将此函数传递给DAG,它会使用 url 发布到 slack(或您选择的其他媒体)。请注意,您必须提供 base_url 才能使 URL 正常工作。
default_args = {"on_failure_callback": on_failure_callback_slack}
dag=DAG(dag_id='some_id', default_args=default_args)
更多信息可以在这里阅读:https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html
【讨论】:
任务实例有一个log_url 属性。在回调函数中,可以这样访问:
def on_failure_callback(context):
dag_run = context.get("dag_run")
log_url = dag_run.get_task_instance({your_task_id}).log_url
# Do whatever with the log_url
请注意,根据 Airflow 设置,log_url 可能指向本地地址(例如 http://localhost:8080),可能应将其替换为向用户公开的任何 URL。
【讨论】: