【问题标题】:subprocess.Popen works outside but not inside ipyparallel?subprocess.Popen 在外部工作,但不在 ipyparallel 内部工作?
【发布时间】:2016-08-08 22:59:51
【问题描述】:

我正在尝试使用ipyparallel 并行化来自here 的一些代码。简而言之,我可以使函数在 apply_sync() 之外正常工作,但我似乎无法让它们在其中工作(我发誓我之前有这个工作,但我找不到代码的版本没有坏)。一个简单的例子:

def test3(fname = '7_1197_.txt'):
    import subprocess
    command = 'touch data/sentiment/' + fname + '.test'
    child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    while True:
        out = child.stdout.read(1)
        if out == '' and child.poll() != None:
            return 
test3() #this works, creates a file with the .test extention
results = view.map_sync(test3, speeches) #this doesn't work. No files created.

这是我实际要使用的函数的简短版本。它自己工作得很好。在apply_sync() 中,它根据htop 启动java 进程,但它似乎没有从这些进程中得到任何回报。

def test2(fname = '7_1197_.txt'):
    import subprocess

    settings = ' -mx5g edu.stanford.nlp.sentiment.SentimentPipeline'
    inputFile = ' -file data/sentiment/' + fname
    command = 'java ' + settings + inputFile
    child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    results = []
    while True:
        out = child.stdout.read(1)
        if out == '' and child.poll() != None:
            return ''.join(results)
        if out != '':
            results.extend(out)
test2() #Works fine, produces output
results = view.map_sync(test2, speeches) #Doesn't work: the results are empty strings.

我尝试了一个返回命令变量的版本。发送到Popen 的命令很好,并且在命令行中手动粘贴时它们可以工作。我想这可能只是管道的问题,但是更改命令以将输出重定向到带有' > '+fname+'.out' 的文件在apply_sync() 调用中也不起作用(不生成输出文件)。

我应该怎么做才能从系统回调中获得stdout

【问题讨论】:

    标签: python-2.7 subprocess stanford-nlp ipython-parallel


    【解决方案1】:

    我看到了两个潜在的陷阱。一个用于阻塞,一个用于丢失文件。对于丢失的文件,您应该确保您的引擎和本地会话位于同一工作目录中,或者确保使用绝对路径。本地和远程同步路径的快速方法:

    client[:].apply_sync(os.chdir, os.getcwd())
    

    也就是说:获取local cwd,然后在任何地方调用os.chdir,这样我们就可以共享同一个工作目录。如果您在 IPython 会话中,一个快速的捷径是:

    %px cd {os.getcwd()}
    

    至于阻塞,我的第一个想法是:并行运行时你可能使用 Python 3 吗?如果是这样,child.stdout.read 返回 bytes 而不是 text。在 Python 2 中,str is bytes,所以 out == '' 可以工作,但在 Python 3 中,条件 out == '' 永远不会为真,因为 b'' != u'',你的函数永远不会返回。

    一些更有用的信息:

    1. stdout.read(N) 将读取最多该字节数,如果输出完成则截断。这很有用,因为read(1) 将循环很多次 次,即使输出都在等待读取。
    2. stdout.read() 只会在输出完成时返回一个空字节串,所以你只需要检查它,而不是 child.poll() 在返回之前。 (只要您没有在 FD 上设置 NOWAIT 即可,这是一些高级用法)。
    3. 如果您想在函数返回之前查看部分输出,可以在 sys.stdout 上重新显示输出,并在 IPython 中查看部分输出,而无需等待最终结果。

    所以这里有几个你的函数的实现,具有不同的目标。

    第一个似乎使用Popen.communicate 完成您当前的目标,如果您实际上不想对部分输出做任何事情和/或在您等待的函数中无事可做,这是最简单的选择输出:

    def simple(fname = '7_1197_.txt'):
        import subprocess
        command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname)
        child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
        # if we aren't doing anything with partial outputs,
        # child.communicate() does all of our waiting/capturing for us:
        out, err = child.communicate()
        return out
    

    (也可以使用stderr=subprocess.PIPE 包含stderr 捕获或使用stderr=subprocess.STDOUT 将stderr 合并到stdout 中)。

    这是另一个例子,将 stderr 收集到 stdout 中,并以块的形式读取:

    def chunked(fname = '7_1197_.txt'):
        import subprocess
        command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname)
        child = subprocess.Popen(command, shell=True,
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT,
                                )
        chunks = []
        while True:
            chunk = child.stdout.read(80) # read roughly one line at a time
            if chunk:
                chunks.append(chunk)
                continue
            else:
                # read will only return an empty bytestring when output is finished
                break
        return b''.join(chunks)
    

    请注意,我们可以使用if not chunk 条件来确定输出何时完成,而不是if chunk == '',因为空字节串是假的。如果我们不对部分输出做任何事情,那么真的没有理由使用它来代替上面更简单的.communicate() 版本。

    最后,这里有一个可以与 IPython 一起使用的版本,它不是捕获和返回输出,而是重新显示它,我们可以使用它在客户端显示 部分 输出:

    def chunked_redisplayed(fname = '7_1197_.txt'):
        import sys, subprocess
        command = 'for i in {{1..20}}; do echo "{0}"; sleep 0.25; done'.format(fname)
        child = subprocess.Popen(command, shell=True,
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT,
                                )
        while True:
            chunk = child.stdout.read(80) # read roughly one line at a time
            if chunk:
                sys.stdout.write(chunk.decode('utf8', 'replace'))
                continue
            else:
                # read will only return an empty bytestring when output is finished
                break
    

    在客户端中,如果您使用map_async 而不是map_sync,您可以检查result.stdout,这是一个标准输出流列表到目前为止,因此您可以检查关于进展:

    amr = view.map_async(chunked_redisplayed, speeches)
    amr.stdout # list of stdout text, updated in the background as output is produced
    amr.wait_interactive() # waits and shows progress
    amr.get() # waits for and returns the actual result
    

    【讨论】:

    • child.communicate() 解决了我的问题,同步目录的提示也很有帮助。当我的笔记本/调用它的脚本在 2.7 中时,我仍然对为什么我的 ipcluster 可能在 python 3 中运行感到困惑?
    • 可能的 PATH 问题。您可以在各种流程中检查sys.executableos.environ['PATH'] 之类的内容,以及which -a ipenginewhich -a ipcluster 中可能存在的差异。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-07-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多