【问题标题】:MapReduce with paramiko how to print stdout as it streams带有 paramiko 的 MapReduce 如何在流式传输时打印标准输出
【发布时间】:2016-01-13 06:32:27
【问题描述】:

我使用 paramiko 创建了一个小型 Python 脚本,它允许我运行 MapReduce 作业,而无需使用 PuTTY 或 cmd 窗口来启动作业。这很好用,只是在工作完成之前我看不到标准输出。如何设置它,以便我可以看到生成的每一行标准输出,就像我可以通过 cmd 窗口一样?

这是我的脚本:

import paramiko

# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxxx'
pw = 'xxxxxxxxx'

# Commands
list_dir = "ls /nfs_home/appers/cnielsen -l"
MR = "hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -files /nfs_home/appers/cnielsen/product_lookups.xml -file /nfs_home/appers/cnielsen/Mapper.py -file /nfs_home/appers/cnielsen/Reducer.py -mapper '/usr/lib/python_2.7.3/bin/python Mapper.py test1' -file /nfs_home/appers/cnielsen/Process.py -reducer '/usr/lib/python_2.7.3/bin/python Reducer.py' -input /nfs_home/appers/extracts/*/*.xml -output /user/loc/output/cnielsen/test51"
getmerge = "hadoop fs -getmerge /user/loc/output/cnielsen/test51 /nfs_home/appers/cnielsen/test_010716_0.txt"

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, username=user, password=pw)
##stdin, stdout, stderr = client.exec_command(list_dir)
##stdin, stdout, stderr = client.exec_command(getmerge)
stdin, stdout, stderr = client.exec_command(MR)

print "Executing command..."

for line in stdout:
    print '... ' + line.strip('\n')
for l in stderr:
    print '... ' + l.strip('\n')
client.close()

【问题讨论】:

  • 这似乎是由于缓冲而发生的。不知何故,默认行缓冲被覆盖。您能否说明您是如何运行此脚本和主机环境的详细信息?
  • 我在 Windows 7 上的 PyScripter IDE 中运行这个脚本。

标签: python hadoop mapreduce paramiko


【解决方案1】:

此代码隐式调用 stdout.read() 直到 EOF 才会阻塞。因此,您必须分块读取 stdout/stderr 才能立即获得输出。 this answer 尤其是 this answer 的修改版本应该可以帮助您解决这个问题。我建议为您的用例调整answer 2,以防止出现一些常见的停滞情况。

这是一个改编自answer 1的示例

sin,sout,serr = ssh.exec_command("while true; do uptime; done")

def line_buffered(f):
    line_buf = ""
    while not f.channel.exit_status_ready():
        line_buf += f.read(1)
        if line_buf.endswith('\n'):
            yield line_buf
            line_buf = ''

for l in line_buffered(sout):   # or serr
    print l

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-04
    • 2019-10-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多