【发布时间】:2021-03-08 19:31:13
【问题描述】:
如何重用在任务之间运行的 DAG 上计算的值?
我正在尝试在我的 DAG 中生成时间戳并将其用于多个任务。到目前为止,我尝试设置变量和参数值 - 没有任何效果,每个任务运行都是唯一的。
这是我的代码:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
default_args = {
"sla": timedelta(hours=1),
}
config = Variable.get("config", deserialize_json=True)
athena_output_bucket = config["athena_output_bucket"]
glue_db = config["glue_db"]
bucket = config["bucket"]
region = config["region"]
def get_snapshot_timestamp(time_of_run=None):
if not time_of_run:
time_of_run = datetime.now()
timestamp = time_of_run.timestamp() * 1000
return str(int(timestamp))
class TemplatedArgsGlueOperator(AwsGlueJobOperator):
template_fields = ("script_args",)
table = "my_table"
with DAG(
"my-table-export",
default_args=default_args,
description="Export my table from DynamoDB to S3",
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
params={
"snapshot_ts": get_snapshot_timestamp(),
"athena_output_location": f"s3://{athena_output_bucket}/{table}",
"table": table,
},
) as dag:
my_table_export_to_s3 = TemplatedArgsGlueOperator(
task_id="my_table_export_to_s3",
job_name="my-table-export-to-s3",
num_of_dpus=2,
region_name=region,
script_args={"--snapshot_ts": "{{ params.snapshot_ts }}"},
)
add_new_partition = AWSAthenaOperator(
task_id="add_new_partition",
query="""
ALTER TABLE {{ params.table }} ADD PARTITION (snapshot_ts = '{{ params.snapshot_ts }}')
LOCATION 's3://{{ var.json.config.bucket }}/{{ params.table }}/snapshot_ts={{ params.snapshot_ts }}'
""",
database=glue_db,
output_location="{{ params.athena_output_location }}",
)
update_latest_view = AWSAthenaOperator(
task_id="update_latest_view",
query="""
CREATE OR REPLACE VIEW {{ params.table }}_latest AS
SELECT * from {{ params.table }}
WHERE snapshot_ts = '{{ params.snapshot_ts }}'
""",
database=glue_db,
output_location="{{ params.athena_output_location }}",
)
my_table_export_to_s3 >> add_new_partition >> update_latest_view
我希望snapshot_ts 在所有三个任务中都相同,但它是不同的。我做错了什么?
【问题讨论】:
标签: airflow