【问题标题】:Make custom Airflow macros expand other macros使自定义 Airflow 宏扩展其他宏
【发布时间】:2017-12-04 23:22:28
【问题描述】:

有没有办法在 Airflow 中创建一个用户定义的宏,它本身是从其他宏计算出来的?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)

这里的用例是将新的 Airflow v1.8 next_execution_date 宏反向移植到 Airflow v1.7 中。不幸的是,这个模板是在没有宏扩展的情况下呈现的:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"

【问题讨论】:

    标签: airflow apache-airflow


    【解决方案1】:

    这些都不适合我,所以这就是我所做的,我使用了user_defined_macros,但我将所有模板变量传递给我的宏,然后我使用 jinja 来呈现结果

    MACRO_CONFIG = 'config({"data_interval_start": data_interval_start, "data_interval_end": data_interval_end, "ds": ds, "ds_nodash": ds_nodash, "ts": ts, "ts_nodash_with_tz": ts_nodash_with_tz, "ts_nodash": ts_nodash, "prev_data_interval_start_success": prev_data_interval_start_success, "prev_data_interval_end_success": prev_data_interval_end_success, "dag": dag, "task": task, "macros": macros, "task_instance": task_instance, "ti": ti, "params": params, "conn": conn, "task_instance_key_str": task_instance_key_str, "conf": conf, "run_id": run_id, "dag_run": dag_run, "test_mode": test_mode})'
    
    def config_macro(context):
        return FunctionThatReturnsTemplates(context)
    
    with DAG(
            'my-dag-id',
            schedule_interval=None,
            start_date=days_ago(1),
            user_defined_macros={'config': config_macro}
    ) as dag:
    ...
    
    def config_macro_template(attr_name):
        return '{{' + MACRO_CONFIG + '.' + attr_name + '}}'
    
    class FunctionThatReturnsTemplates(object):
        def __getattribute__(self, name):
            attr = object.__getattribute__(self, name)
    
            logging.info('attr')
            logging.info(attr)
            logging.info("type(attr)")
            logging.info(type(attr))
    
            if callable(attr):
                logging.info('method attr')
    
                def render_result(*args, **kwargs):
                    logging.info('before calling %s' % attr.__name__)
                    result = attr(*args, **kwargs)
                    logging.info('done calling %s' % attr.__name__)
    
                    return Template(result).render(**self.context) if isinstance(result, str) or isinstance(result, unicode) else result
    
                return render_result
    
            logging.info('attr is not method')
            if isinstance(attr, str) or isinstance(attr, unicode):
                logging.info('attr is string or unicode')
                result = Template(attr).render(**self.context)
                logging.info(result)
                logging.info("result")
                return result
    
            return attr
    
        def __init__(self, context):
            logging.info('from sampling pipeline context')
            logging.info(context)
            self.context = context
    ...
    
        my_task = SomeOperator(
            templated_field=config_macro_template('function(args)'),
            task_id='my-task-id'
        )
    

    【讨论】:

      【解决方案2】:

      我会投票支持制作 Airflow 插件来注入您的预定义宏。 使用此方法,您可以在任何 Operator 中使用预定义的宏,而无需声明任何内容。

      以下是我们正在使用的一些自定义宏。 使用示例:{{ macros.dagtz_next_execution_date(ti) }}

      from airflow.plugins_manager import AirflowPlugin
      from datetime import datetime, timedelta
      from airflow.utils.db import provide_session
      from airflow.models import DagRun
      import pendulum
      
      
      @provide_session
      def _get_dag_run(ti, session=None):
          """Get DagRun obj of the TaskInstance ti
      
          Args:
              ti (TYPE): the TaskInstance object
              session (None, optional): Not in use
      
          Returns:
              DagRun obj: the DagRun obj of the TaskInstance ti
          """
          task = ti.task
          dag_run = None
          if hasattr(task, 'dag'):
              dag_run = (
                  session.query(DagRun)
                  .filter_by(
                      dag_id=task.dag.dag_id,
                      execution_date=ti.execution_date)
                  .first()
              )
              session.expunge_all()
              session.commit()
          return dag_run
      
      
      def ds_add_no_dash(ds, days):
          """
          Add or subtract days from a YYYYMMDD
          :param ds: anchor date in ``YYYYMMDD`` format to add to
          :type ds: str
          :param days: number of days to add to the ds, you can use negative values
          :type days: int
          >>> ds_add('20150101', 5)
          '20150106'
          >>> ds_add('20150106', -5)
          '20150101'
          """
      
          ds = datetime.strptime(ds, '%Y%m%d')
          if days:
              ds = ds + timedelta(days)
          return ds.isoformat()[:10].replace('-', '')
      
      
      def dagtz_execution_date(ti):
          """get the TaskInstance execution date (in DAG timezone) in pendulum obj
      
          Args:
              ti (TaskInstance): the TaskInstance object
      
          Returns:
              pendulum obj: execution_date in pendulum object (in DAG tz)
          """
          execution_date_pdl = pendulum.instance(ti.execution_date)
          dagtz_execution_date_pdl = execution_date_pdl.in_timezone(ti.task.dag.timezone)
          return dagtz_execution_date_pdl
      
      
      def dagtz_next_execution_date(ti):
          """get the TaskInstance next execution date (in DAG timezone) in pendulum obj
      
          Args:
              ti (TaskInstance): the TaskInstance object
      
          Returns:
              pendulum obj: next execution_date in pendulum object (in DAG tz)
          """
      
          # For manually triggered dagruns that aren't run on a schedule, next/previous
          # schedule dates don't make sense, and should be set to execution date for
          # consistency with how execution_date is set for manually triggered tasks, i.e.
          # triggered_date == execution_date.
          dag_run = _get_dag_run(ti)
          if dag_run and dag_run.external_trigger:
              next_execution_date = ti.execution_date
          else:
              next_execution_date = ti.task.dag.following_schedule(ti.execution_date)
      
          next_execution_date_pdl = pendulum.instance(next_execution_date)
          dagtz_next_execution_date_pdl = next_execution_date_pdl.in_timezone(ti.task.dag.timezone)
          return dagtz_next_execution_date_pdl
      
      
      def dagtz_next_ds(ti):
          """get the TaskInstance next execution date (in DAG timezone) in YYYY-MM-DD string
          """
          dagtz_next_execution_date_pdl = dagtz_next_execution_date(ti)
          return dagtz_next_execution_date_pdl.strftime('%Y-%m-%d')
      
      
      def dagtz_next_ds_nodash(ti):
          """get the TaskInstance next execution date (in DAG timezone) in YYYYMMDD string
          """
          dagtz_next_ds_str = dagtz_next_ds(ti)
          return dagtz_next_ds_str.replace('-', '')
      
      
      def dagtz_prev_execution_date(ti):
          """get the TaskInstance previous execution date (in DAG timezone) in pendulum obj
      
          Args:
              ti (TaskInstance): the TaskInstance object
      
          Returns:
              pendulum obj: previous execution_date in pendulum object (in DAG tz)
          """
      
          # For manually triggered dagruns that aren't run on a schedule, next/previous
          # schedule dates don't make sense, and should be set to execution date for
          # consistency with how execution_date is set for manually triggered tasks, i.e.
          # triggered_date == execution_date.
          dag_run = _get_dag_run(ti)
          if dag_run and dag_run.external_trigger:
              prev_execution_date = ti.execution_date
          else:
              prev_execution_date = ti.task.dag.previous_schedule(ti.execution_date)
      
          prev_execution_date_pdl = pendulum.instance(prev_execution_date)
          dagtz_prev_execution_date_pdl = prev_execution_date_pdl.in_timezone(ti.task.dag.timezone)
          return dagtz_prev_execution_date_pdl
      
      
      def dagtz_prev_ds(ti):
          """get the TaskInstance prev execution date (in DAG timezone) in YYYY-MM-DD string
          """
          dagtz_prev_execution_date_pdl = dagtz_prev_execution_date(ti)
          return dagtz_prev_execution_date_pdl.strftime('%Y-%m-%d')
      
      
      def dagtz_prev_ds_nodash(ti):
          """get the TaskInstance prev execution date (in DAG timezone) in YYYYMMDD string
          """
          dagtz_prev_ds_str = dagtz_prev_ds(ti)
          return dagtz_prev_ds_str.replace('-', '')
      
      
      # Defining the plugin class
      class AirflowTestPlugin(AirflowPlugin):
          name = "custom_macros"
          macros = [dagtz_execution_date, ds_add_no_dash,
                    dagtz_next_execution_date, dagtz_next_ds, dagtz_next_ds_nodash,
                    dagtz_prev_execution_date, dagtz_prev_ds, dagtz_prev_ds_nodash]
      

      【讨论】:

      • 有趣的@z1k,感谢您提供完整的代码。如果您将 Python 的许多行精简为一个最小示例来说明您的观点,那就更好了。未来的读者,我会感谢你?
      • @z1k 如何在运算符或 DAG 中使用自定义宏?你能举个例子吗
      【解决方案3】:

      这里有一些解决方案:

      1。覆盖 BashOperator 以向上下文添加一些值

      class NextExecutionDateAwareBashOperator(BashOperator):
          def render_template(self, attr, content, context):
              dag = context['dag']
              execution_date = context['execution_date']
              context['next_execution_date'] = dag.following_schedule(execution_date)
      
              return super().render_templates(attr, content, context)
              # or in python 2:
              # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)
      

      这种方法的好处是:您可以在自定义运算符中捕获一些重复的代码。

      不好的部分:在渲染模板化字段之前,您必须编写一个自定义运算符来向上下文添加值。

      2。在用户定义的宏中进行计算

      Macros 不一定是值。它们可以是函数。

      在你的日子里:

      def compute_next_execution_date(dag, execution_date):
          return dag.following_schedule(execution_date)
      
      dag = DAG(
          'simple',
          schedule_interval='0 21 * * *',
          user_defined_macros={
              'next_execution_date': compute_next_execution_date,
          },
      )
      
      task = BashOperator(
          task_id='bash_op',
          bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
          dag=dag,
      )
      

      好的部分:您可以定义可重用函数来处理运行时可用的值(XCom values、作业实例属性、任务实例属性等...),并使您的函数结果可用于呈现模板。

      不好的部分(但不是那么烦人):您必须在需要的每个 dag 中导入这样的函数作为用户定义的宏。

      3。直接在您的模板中调用您的语句

      这个解决方案是最简单的(正如Ardan's answer 所提到的),在你的情况下可能是一个很好的解决方案。

      BashOperator(
          task_id='bash_op',
          bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
          dag=dag,
      )
      

      非常适合像这样的简单调用。它们是其他一些直接可用的对象macros(如tasktask_instance等...);甚至可以使用一些标准模块(如macros.time,...)。

      【讨论】:

      • 在第一个代码sn-p更新最后一行,请return super(NextExecutionDateAwareBashOperator, self).render_template(attr, content, context)
      • @RomanKazakov:我认为你在使用 python 2 时是对的;但是我可以在 python 3 中使用super()
      • @Géraud 你能看看我的问题吗?如果你能回答,你会帮助我的。谢谢stackoverflow.com/questions/67631581/…
      【解决方案4】:

      user_defined_macros 默认不作为模板处理。如果您想将模板保留在 user_defined_macro 中(或者如果您在 params 变量中使用模板),您始终可以手动重新运行模板函数:

      class DoubleTemplatedBashOperator(BashOperator):
          def pre_execute(self, context):
              context['ti'].render_templates()
      

      这也适用于不引用其他参数或 UDM 的模板。这样,您就可以拥有“双深”模板。

      或者将你的UDM直接放在BashOperator的命令中(最简单的解决方案):

      BashOperator(
          task_id='bash_op',
          bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
          dag=dag,
      )
      

      【讨论】:

      • 是的,将 UDF 宏扩展到每个需要它的地方肯定会起作用,但是您会多次重复同一段宏代码。我想人们可以依赖 Python 工具,而不是使用 UDF 宏,而是在运行时格式化字符串:bash_command='echo "{next_execution_date}"'.format(next_execution_date=NEXT_EXECUTION_DATE),,但它不会那么干净。
      • @mksios 我试图让我的解释更清楚,但是如果您在任务运行之前调用render_templates,您可以完全按照您的意愿进行操作。默认情况下,您的 UDM 会被放入命令中;第二,手动填写UDM中的模板变量。
      • 我明白了,这很酷......我能想到的唯一不良副作用是:(1)需要转义参数的任何其他部分以防止双重扩展在不需要的部分; (2) 必须在每个运算符类型中执行此操作。尽管如此,它似乎最接近最终目标……是时候向 Airflow 提出拉取请求了!
      • @mksios 太好了!如果有帮助,请投票/接受答案:)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-07-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-12-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多