【问题标题】:How can I write a file from a DAG to my composer cloud storage bucket?如何将文件从 DAG 写入我的作曲家云存储桶?
【发布时间】:2021-12-29 15:13:48
【问题描述】:

这个问题的accepted answer 指出

“...gs://my-bucket/dags 文件夹在 /home/airflow/gcs/dags 的调度程序、Web 服务器和工作程序中可用。”

(which is supported by the newer docs)

所以我写了一个这样的 bash 运算符:

t1 = bash.BashOperator(
    task_id='my_test',
    bash_command="touch /home/airflow/gcs/data/test.txt",
    )

我想通过在我的文件创建前加上答案中指定的路径,它将写入我的云作曲家环境的关联存储帐户中的数据文件夹。同样,touch test.txt 也成功运行,但实际上并没有在我能看到的任何地方创建文件(我假设它已写入工作人员的临时存储,然后在工作人员在执行 DAG 后关闭时将其删除)。我似乎无法保留通过 DAG 运行的简单命令中的任何数据?甚至可以从 Cloud Composer 中运行的 bash 脚本中简单地写出一些文件吗?提前谢谢你。

【问题讨论】:

  • 您是否尝试将路径添加到新变量并将变量传递给bash_command?它会是这样的command=”./home/airflow/gcs/data/test.txt”,然后像bash_command=command一样传递它。
  • 谢谢,我会为我的下一个 BashOperator 做这个。

标签: airflow google-cloud-composer


【解决方案1】:

奇怪的是,我需要在包含 Bash 命令的字符串末尾添加一个空格。

t1 = bash.BashOperator(
    task_id='my_test',
    bash_command="touch /home/airflow/gcs/data/test.txt ",
    )

令人沮丧的是错误说路径不存在,所以我去了一个兔子洞映射 Airflow 工作人员的目录,直到我完全确定它确实存在 - 然后我发现了一个类似的问题 here。虽然我没有得到“未找到 Jinja 模板错误”,但我应该根据这个 note 得到。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多