【发布时间】:2019-03-04 02:43:24
【问题描述】:
我刚开始使用 Airflow 来协调我们的 ETL 管道。
我在运行 dag 时遇到了管道错误。
我看过一个一般性的 stackoverflow 讨论 here。
我的情况更多的是在气流方面。根据该帖子中的讨论,可能的根本原因是:
如果您的请求被阻止或 需要太长时间并且在请求端超时后,它将关闭 连接,然后,当响应端(服务器)尝试写入 套接字,它会抛出一个管道损坏的错误。
在我的情况下,这可能是真正的原因,我有一个 pythonoperator 将在 Airflow 之外开始另一项工作,并且该工作可能非常漫长(即 10 多个小时),我想知道其中的机制是什么我可以利用气流来防止此错误。
有人可以帮忙吗?
UPDATE1 20190303-1:
感谢 SSHOperator 的 @y2k-shubham,我能够使用它成功建立 SSH 连接,并且能够在远程站点上运行一些简单的命令(事实上,默认的 ssh 连接必须设置为 localhost因为作业在本地主机上)并且能够看到hostname、pwd 的正确结果。
但是,当我尝试运行实际作业时,我收到了同样的错误,同样,错误来自 jpipeline ob 而不是 Airflow dag/task。
更新2:20190303-2
我成功运行(气流测试),没有出现错误,然后在另一个失败的运行(调度程序)中出现同样的管道错误。
【问题讨论】:
-
我已经通过
Airflow(SSHHook) 在远程系统上运行了bash脚本(Hadoop作业),但卡住了超过7-8 小时当磁盘(在远程系统上)用完并在我手动删除数据后恢复时。尽管hadoop的工作具有YARN的goods,即使在卡住 的情况下也能保持活着 并优雅地恢复一切。而且keepalive_interval也有助于防止Airflow结束时超时。所以从PythonOperator转移到SSHOperator可能对你有用 -
非常感谢您的启发,您可以在这里使用示例 sshoperator 写一个 anwser 吗?
标签: airflow