【问题标题】:Reuse parameter value across different tasks in Airflow在 Airflow 中跨不同任务重用参数值
【发布时间】:2021-03-08 19:31:13
【问题描述】:

如何重用在任务之间运行的 DAG 上计算的值?

我正在尝试在我的 DAG 中生成时间戳并将其用于多个任务。到目前为止,我尝试设置变量和参数值 - 没有任何效果,每个任务运行都是唯一的。

这是我的代码:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.utils.dates import days_ago

from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator

default_args = {
    "sla": timedelta(hours=1),
}

config = Variable.get("config", deserialize_json=True)
athena_output_bucket = config["athena_output_bucket"]
glue_db = config["glue_db"]
bucket = config["bucket"]
region = config["region"]


def get_snapshot_timestamp(time_of_run=None):
    if not time_of_run:
        time_of_run = datetime.now()

    timestamp = time_of_run.timestamp() * 1000
    return str(int(timestamp))


class TemplatedArgsGlueOperator(AwsGlueJobOperator):
    template_fields = ("script_args",)


table = "my_table"

with DAG(
    "my-table-export",
    default_args=default_args,
    description="Export my table from DynamoDB to S3",
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    params={
        "snapshot_ts": get_snapshot_timestamp(),
        "athena_output_location": f"s3://{athena_output_bucket}/{table}",
        "table": table,
    },
) as dag:
    my_table_export_to_s3 = TemplatedArgsGlueOperator(
        task_id="my_table_export_to_s3",
        job_name="my-table-export-to-s3",
        num_of_dpus=2,
        region_name=region,
        script_args={"--snapshot_ts": "{{ params.snapshot_ts }}"},
    )

    add_new_partition = AWSAthenaOperator(
        task_id="add_new_partition",
        query="""
          ALTER TABLE {{ params.table }} ADD PARTITION (snapshot_ts = '{{ params.snapshot_ts }}')
          LOCATION 's3://{{ var.json.config.bucket }}/{{ params.table }}/snapshot_ts={{ params.snapshot_ts }}'
        """,
        database=glue_db,
        output_location="{{ params.athena_output_location }}",
    )

    update_latest_view = AWSAthenaOperator(
        task_id="update_latest_view",
        query="""
          CREATE OR REPLACE VIEW {{ params.table }}_latest AS
          SELECT * from {{ params.table }}
          WHERE snapshot_ts = '{{ params.snapshot_ts }}'
        """,
        database=glue_db,
        output_location="{{ params.athena_output_location }}",
    )


my_table_export_to_s3 >> add_new_partition >> update_latest_view

我希望snapshot_ts 在所有三个任务中都相同,但它是不同的。我做错了什么?

【问题讨论】:

    标签: airflow


    【解决方案1】:

    这应该可以通过 xcom 实现。 xCom 精确地用于在各种任务之间交换信息。引用

    XComs 让任务交换消息,允许更细微的形式 控制和共享状态。名字是缩写 “交叉沟通”。 XCom 主要由键、值、 和时间戳,还可以跟踪任务/DAG 等属性 创建了 XCom 以及它应该何时可见。任何物体 可以腌制 可以用作 XCom 值,因此用户应确保 使用适当大小的对象。

    在 xCom 中,pythonoperator 用于调用函数。该函数将一些值推送到气流元数据数据库中名为 xcom 的表中。然后通过其他 DAG 或任务进行访问。

    这里有一个如何做这一切的例子 - https://www.cloudwalker.io/2020/01/28/airflow-xcom/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-22
      • 1970-01-01
      • 2022-07-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多