【问题标题】:Airflow python returned value handling via XCOM气流 python 通过 XCOM 处理返回值
【发布时间】:2018-08-04 09:30:11
【问题描述】:

我正在尝试创建如下所述的 Airflow dag: 我有一个相当大的 python 代码,最终会创建一个文件。 该文件是使用特定名称创建的,例如 sales20180802130200.json

下面的 Airflow 任务是 s3BucketUpload 操作员。它需要获取文件名才能上传到 s3。

第一个 python 文件可能由bashOperator 运行。它如何使用文件名创建 Xcom 密钥?有没有其他方式传递值?

谢谢 谢比

【问题讨论】:

  • 您可以使用 Python 运算符运行 Python 代码并将文件名推送到 XCom。

标签: airflow


【解决方案1】:

只要路径参数包含在运算符类中的模板参数中,Airflow 就会为您插入值。我不知道 s3BucketUpload 运算符长什么样,所以我假设参数名称。

class s3BucketUploadOperator(BaseOperator):
    # this tuple is not used by anything in my operator classes and is not passed anywhere
    template_fields = ('local_path', 's3_path', )
    ...

def py_fn(task_instance, **context):
    task_instance.xcom_push(key='file_name', value='file.name')

py_task = PythonOperator(
    dag=dag,
    task_id='py_task',
    provide_context=True,
    python_callable=py_fn
)

s3_task = s3BucketUploadOperator(
    dag=dag,
    task_id='s3_task',
    s3_conn_id='?',
    local_path="path/to/dir/{{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}",
    s3_path="path/to/s3/dir/{{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}"
)

编辑

如果你想使用 BashOperator,bash_command="echo {{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}" 应该将文件名打印到任务日志。

【讨论】:

    猜你喜欢
    • 2020-01-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-13
    • 2016-09-18
    • 1970-01-01
    • 2015-11-02
    • 1970-01-01
    相关资源
    最近更新 更多