【问题标题】:How to prevent "Execution failed:[Errno 32] Broken pipe" in Airflow如何防止气流中的“执行失败:[Errno 32] Broken pipe”
【发布时间】:2019-03-04 02:43:24
【问题描述】:

我刚开始使用 Airflow 来协调我们的 ETL 管道。

我在运行 dag 时遇到了管道错误。

我看过一个一般性的 stackoverflow 讨论 here

我的情况更多的是在气流方面。根据该帖子中的讨论,可能的根本原因是:

如果您的请求被阻止或 需要太长时间并且在请求端超时后,它将关闭 连接,然后,当响应端(服务器)尝试写入 套接字,它会抛出一个管道损坏的错误。

在我的情况下,这可能是真正的原因,我有一个 pythonoperator 将在 Airflow 之外开始另一项工作,并且该工作可能非常漫长(即 10 多个小时),我想知道其中的机制是什么我可以利用气流来防止此错误。

有人可以帮忙吗?

UPDATE1 20190303-1:

感谢 SSHOperator 的 @y2k-shubham,我能够使用它成功建立 SSH 连接,并且能够在远程站点上运行一些简单的命令(事实上,默认的 ssh 连接必须设置为 localhost因为作业在本地主机上)并且能够看到hostnamepwd 的正确结果。

但是,当我尝试运行实际作业时,我收到了同样的错误,同样,错误来自 jpipeline ob 而不是 Airflow dag/task。

更新2:20190303-2

我成功运行(气流测试),没有出现错误,然后在另一个失败的运行(调度程序)中出现同样的管道错误。

【问题讨论】:

  • 我已经通过Airflow (SSHHook) 在远程系统上运行了bash 脚本(Hadoop 作业),但卡住了超过7-8 小时当磁盘(在远程系统上)用完并在我手动删除数据后恢复时。尽管hadoop 的工作具有YARNgoods,即使在卡住 的情况下也能保持活着 并优雅地恢复一切。而且keepalive_interval 也有助于防止Airflow 结束时超时。所以从PythonOperator 转移到SSHOperator 可能对你有用
  • 非常感谢您的启发,您可以在这里使用示例 sshoperator 写一个 anwser 吗?

标签: airflow


【解决方案1】:

虽然我建议您继续寻找一种更优雅的方式来尝试实现您想要的东西,但我会根据要求提供示例用法


首先您必须创建一个SSHHook。这可以通过两种方式完成

  • 从您正在实例化钩子的客户端代码中提供所有必需设置(如主机、用户、密码(如果需要)等)的传统方式。我在此引用test_ssh_hook.py 的示例,但您必须彻底了解SSHHook 及其测试以了解所有可能的用法

    ssh_hook = SSHHook(remote_host="remote_host",
                       port="port",
                       username="username",
                       timeout=10,
                       key_file="fake.file")
    
  • Airflow way 将所有连接详细信息放在可以从 UI 管理的 Connection 对象中,并且只传递它的 conn_id 来实例化您的钩子

    ssh_hook = SSHHook(ssh_conn_id="my_ssh_conn_id")
    

    当然,如果你依赖SSHOperator,那么你可以直接将ssh_conn_id传递给operator。

    ssh_operator = SSHOperator(ssh_conn_id="my_ssh_conn_id")
    

现在,如果您计划有一个专用任务来通过SSH 运行命令,您可以使用SSHOperator。我再次引用test_ssh_operator.py 中的一个例子,但请查看来源以获得更好的图片。

 task = SSHOperator(task_id="test",
                    command="echo -n airflow",
                    dag=self.dag,
                    timeout=10,
                    ssh_conn_id="ssh_default")

但是您可能希望通过 SSH 运行命令作为您的更大任务的一部分。在这种情况下,您不需要SSHOperator,您仍然可以只使用SSHHookSSHHookget_conn() 方法为您提供paramiko SSHClient 的实例。有了这个,您可以使用exec_command() call 运行命令

my_command = "echo airflow"
stdin, stdout, stderr = ssh_client.exec_command(
  command=my_command,
  get_pty=my_command.startswith("sudo"),
  timeout=10)

如果您查看SSHOperatorexecute() 方法,它是一段相当复杂(但健壮)的代码,试图实现一个非常简单的事情。对于我自己的使用,我创建了一些您可能想要查看的snippets

  • 要独立于SSHOperator 使用SSHHook,请查看ssh_utils.py
  • 对于通过 SSH 运行 多个命令 的操作员(您可以使用 bash's && operator 实现相同的目的),请参阅 MultiCmdSSHOperator

【讨论】:

  • 非常感谢您提供的详细信息,我注意到的另一个异常情况是 Admin/Connections 中的 SSH 连接设置,每次 DAG 运行都会刷新 SSH_Default 连接,我需要重新生成它才能使用再来一次。
猜你喜欢
  • 2014-11-24
  • 2015-05-24
  • 2010-12-26
  • 1970-01-01
  • 1970-01-01
  • 2021-01-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多