【问题标题】:How to return value from function running by QThread and Queue如何从 QThread 和 Queue 运行的函数中返回值
【发布时间】:2014-09-26 07:34:05
【问题描述】:

请解释我们如何从队列管理的线程发送/接收数据......

首先,我定义了它的 run() 方法的子类 'QThread',该方法在调用 QThread's.start() 时启动:

class SimpleThread(QtCore.QThread):
    def __init__(self, queue, parent=None):
        QtCore.QThread.__init__(self, parent)      
        self.queue=queue        
    def run(self):
        while True:
            arg=self.queue.get() 
            self.fun(arg)    
            self.queue.task_done()
    def fun(self, arg):
        for i in range (3):
            print 'fun: %s'%i
            self.sleep(1)
        return arg+1

然后我声明两个 Thread 实例(因此只占用两个 CPU 内核)发送 self.queue 实例作为参数。

self.queue=queue.Queue()
for i in range(2):
    thread=SimpleThread(self.queue)
    thread.start()

现在,如果我理解正确,thread.start() 不会开始任何事情。真正的“开始”只有在我调用queue.put()时才会发生:

for arg in [1,2,3]: self.queue.put(arg)

最后一行是“真正的”调用。除了创建和启动 Queue itemput() 还允许将任意值保存到每个 Queue item.put() 一次做几件事:它创建,它开始,它通过队列移动处理,它允许将一个变量放在队列项目的“内部”(稍后可以从函数处理器内部检索到:使用队列项的 '.get()` 方法)。

但是我如何从fun() 函数返回值。 “常规”fun()return resultValue 不起作用。而且我不能使用 self.queue.put() 方法,因为这种方法除了存储数据“创建”一个新的队列项......

稍后编辑:

这里是稍微调整的代码(从另一篇文章复制/粘贴),展示了如何从已完成的线程返回值的方法。我不确定这里使用的方法是否适用于 QThread...如果我错了,请纠正我:

import os, sys
import threading
import Queue

def callMe(incomingFun, daemon=False):
    def execute(_queue, *args, **kwargs):
        result=incomingFun(*args, **kwargs)
        _queue.put(result)

    def wrap(*args, **kwargs):
        _queue=Queue.Queue()
        _thread=threading.Thread(target=execute, args=(_queue,)+args, kwargs=kwargs)
        _thread.daemon=daemon
        _thread.start()
        _thread.result_queue=_queue        
        return _thread

    return wrap

@callMe
def localFunc(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

thread=localFunc(10)

# this blocks, waiting for the result
result = thread.result_queue.get()
print result

【问题讨论】:

  • 你几乎不应该从 QThread 派生。您可以使用信号和槽将消息从工作线程传递到主线程。但是Qt Concurrent 更方便您的任务。
  • @PavelStrakhov 我不认为 QtConcurrent 在任何 Qt 的 Python 绑定中都可用。
  • 首先让我们澄清一下我们如何处理来自fun() 函数的返回值。然后我们可以讨论应该导出什么和不应该导出什么。
  • 您希望SimpleThread 在后台永远运行,还是应该在您向其传递任意数量的值后结束?
  • 一般情况下,您会为结果创建第二个返回队列。

标签: python multithreading qt pyqt


【解决方案1】:

在正常情况下,您会使用结果队列将结果发回,然后运行其他线程来等待结果:

class SimpleThread(QtCore.QThread):
    def __init__(self, queue, result_queue, parent=None):
        QtCore.QThread.__init__(self, parent)      
        self.queue=queue
        self.result_queue = result_queue

    def run(self):
        while True:
            arg=self.queue.get() 
            self.fun(arg)    
            self.queue.task_done()

    def fun(self, arg):
        for i in range (3):
            print 'fun: %s'%i
            self.sleep(1)
        self.result_queue.put(arg+1)

def handle_results(result_queue):
   while True:
       result = result_queue.get()
       print("Got result {}".format(result))

主线程:

self.queue=queue.Queue()
self.result_queue = queue.Queue()

result_handler = threading.Thread(target=handle_results, self.result_queue)
for i in range(2):
    thread=SimpleThread(self.queue, self.result_queue)
    thread.start()

这样做可以防止您在等待结果时阻塞 GUI 的事件循环。这是multiprocessing.pool.ThreadPool 的等效项:

from multiprocessing.pool import ThreadPool
import time


def fun(arg):
    for i in range (3):
        print 'fun: %s'%i
        time.sleep(1)
    return arg+1

def handle_result(result):
   print("got result {}".format(result))

pool = ThreadPool(2)
pool.map_async(fun, [1,2,3], callback=handle_result)

这要简单得多。它在内部创建一个结果处理线程,当fun 完成时,它会自动为您调用handle_result

也就是说,您正在使用 QThread,并且您希望结果更新 GUI 小部件,因此您确实希望将结果发送回主线程,而不是结果处理线程。在这种情况下,使用 Qt 的信号系统是有意义的,这样您就可以在收到结果时安全地更新 GUI:

from PyQt4 import QtCore, QtGui
import sys
import Queue as queue

class ResultObj(QtCore.QObject):
    def __init__(self, val):
        self.val = val

class SimpleThread(QtCore.QThread):
    finished = QtCore.pyqtSignal(object)

    def __init__(self, queue, callback, parent=None):
        QtCore.QThread.__init__(self, parent)      
        self.queue = queue
        self.finished.connect(callback)

    def run(self):
        while True:
            arg = self.queue.get() 
            if arg is None: # None means exit
                print("Shutting down")
                return
            self.fun(arg)    

    def fun(self, arg):
        for i in range(3):
            print 'fun: %s' % i
            self.sleep(1)
        self.finished.emit(ResultObj(arg+1))


class AppWindow(QtGui.QMainWindow):
    def __init__(self):
        super(AppWindow, self).__init__()
        mainWidget = QtGui.QWidget()
        self.setCentralWidget(mainWidget)
        mainLayout = QtGui.QVBoxLayout()
        mainWidget.setLayout(mainLayout)  
        button = QtGui.QPushButton('Process')
        button.clicked.connect(self.process)
        mainLayout.addWidget(button)

    def handle_result(self, result):
        val = result.val
        print("got val {}".format(val))
        # You can update the UI from here.

    def process(self):
        MAX_CORES=2
        self.queue = queue.Queue()
        self.threads = []
        for i in range(MAX_CORES):
            thread = SimpleThread(self.queue, self.handle_result)
            self.threads.append(thread)
            thread.start()  

        for arg in [1,2,3]:
            self.queue.put(arg)

        for _ in range(MAX_CORES): # Tell the workers to shut down
            self.queue.put(None)

app = QtGui.QApplication([])
window = AppWindow()
window.show()
sys.exit(app.exec_())

按下按钮时的输出:

fun: 0
 fun: 0
fun: 1
 fun: 1
fun: 2
 fun: 2
fun: 0
got val 2
got val 3
Shutting down
fun: 1
fun: 2
Shutting down
got val 4

【讨论】:

  • 很棒的示例代码!再次感谢。很长一段时间以来我学到的最好的资源之一!感谢您的教育!
  • 在你的最后一个例子中,有一个拼写错误 s/finshed/finished 使它工作
  • 我在 pyqt5 中需要这个答案
  • @lone_coder 看起来 pyqtSignal 在 pyqt5 中仍然存在,所以调整这个答案应该很简单。
  • @dano。是的,我已经弄清楚了,谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多