【问题标题】:How do I retrieve airflow url link to my latest task log in a DAG?如何在 DAG 中检索指向我最新任务日志的气流 url 链接?
【发布时间】:2022-07-19 22:40:38
【问题描述】:

在单个 DAG 任务中,如何在 python 操作员的帮助下设置 url 链接,因为我打算在发生错误时直接向用户发送最新日志的 url 链接,以便他们可以访问页面直接跳过导航这一步。

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    您可以定义一个callback 函数,然后您可以将其作为默认参数传递给DAG() 运算符。

    def on_failure_callback_slack(context):
        message = f"Failue! airflow task: {context['task_instance'].task_id} failed" \
    f"dag: {base_url}?dag_id={context['dag'].dag_id} " \
    f"{str(context['task_instance'])}"
    
        operator = PythonOperator(task_id="failure", python_callable=post_to_slack, op_kwargs={'message': message}
        
        return operator.execute(context=context)
    

    在上面的代码中,post_to_slack() 只是一个使用requests.post(...) 发布到 slack 的实用函数

    您可以将此函数传递给DAG,它会使用 url 发布到 slack(或您选择的其他媒体)。请注意,您必须提供 base_url 才能使 URL 正常工作。

    default_args = {"on_failure_callback": on_failure_callback_slack}
    dag=DAG(dag_id='some_id', default_args=default_args)
    

    更多信息可以在这里阅读:https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html

    【讨论】:

    • 非常感谢。通过一些调整,设法得到结果!欣赏它
    【解决方案2】:

    任务实例有一个log_url 属性。在回调函数中,可以这样访问:

    def on_failure_callback(context):
        dag_run = context.get("dag_run")
        log_url = dag_run.get_task_instance({your_task_id}).log_url
    
        # Do whatever with the log_url
    

    请注意,根据 Airflow 设置,log_url 可能指向本地地址(例如 http://localhost:8080),可能应将其替换为向用户公开的任何 URL。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-01
      • 1970-01-01
      • 2021-03-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多