看起来为每个进程使用单独的线程会产生预期的结果。
原始多进程答案
import os
import sys
import time
import random
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets
def trap_exc_during_debug(*args):
# when app raises uncaught exception, print info
print(args, flush=True)
# install exception hook: without this, uncaught exception would cause application to exit
sys.excepthook = trap_exc_during_debug
class CommandMessages:
start = 'start'
stop = 'stop'
class ProcessWorker(mp.Process):
def __init__(self, id, pipe, work_load, daemon=True):
super().__init__()
self.daemon = daemon
self.pipe = pipe
self.id = id
self.work_load = work_load
print(f'Created worker {self.id} with work_load {self.work_load}', flush=True)
def run(self):
self.pipe.send(f"Worker {self.id} in ({os.getpid()})")
while True:
try:
item = self.pipe.recv()
if item == CommandMessages.start:
self.pipe.send(f"worker ({self.id}): task will take {self.work_load} seconds")
for i in range(self.work_load):
time.sleep(1)
self.pipe.send(f"worker ({self.id}): {i}")
self.pipe.send(f"worker ({self.id}): task complete")
self.pipe.send(CommandMessages.stop)
break
except EOFError: # nothing left to receive
pass
class ThreadWorkerSignals(QtCore.QObject):
done = QtCore.pyqtSignal(int) # worker id
message = QtCore.pyqtSignal(str)
class ThreadWorker(QtCore.QRunnable):
def __init__(self, id, max_load):
super().__init__()
self.signals = ThreadWorkerSignals()
self.id = id
self.max_load = max_load
self._abort = False
def run(self):
thread_name = QtCore.QThread.currentThread().objectName()
thread_id = int(QtCore.QThread.currentThreadId()) # cast to int() to get Id, otherwise it's sip object
self.signals.message.emit(f'Running ThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')
work_load = random.randrange(self.max_load)
self.signals.message.emit(f'ThreadWorker #{self.id} work_load is {work_load}')
m_conn, s_conn = mp.Pipe()
self.pipe = m_conn
self.process_worker = ProcessWorker(self.id, s_conn, work_load)
self.signals.message.emit(f'ThreadWorker {self.id}: starting self.process_worker...')
self.process_worker.start()
self.pipe.send(CommandMessages.start)
self.listen()
def listen(self):
self.signals.message.emit(f'ThreadWorker {self.id} listening')
while True:
try:
msg = self.pipe.recv()
if msg == CommandMessages.stop:
self.signals.message.emit(f'ThreadWorker {self.id}: closing process_worker...')
self.process_worker.join(2)
self.process_worker.terminate()
print(f'ThreadWorker {self.id}: process_worker closed', flush=True)
self.signals.message.emit(f'ThreadWorker {self.id}: process_worker closed')
self.signals.done.emit(self.id)
break
self.signals.message.emit(str(msg))
except EOFError: # nothing left to receive
pass
def abort(self):
self.signals.message.emit(f'ThreadWorker #{self.id} notified to abort')
self._abort = True
class MyDialog(QtWidgets.QDialog):
abort = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self.start_time = None
self.setWindowTitle('MP Concurrency with QThreadPool')
self.resize(600, 400)
self.num_procs = int(mp.cpu_count() / 2) # 4
self._thread_pool = QtCore.QThreadPool.globalInstance()
QtCore.QThread.currentThread().setObjectName('main')
self.threads = []
self.start_button = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
self.start_button.pressed.connect(self.start_processes)
self.abort_button = QtWidgets.QPushButton(f'Abort')
self.abort_button.pressed.connect(self.on_abort)
self.abort_button.setEnabled(False)
self.log = QtWidgets.QTextBrowser()
spin_layout = QtWidgets.QHBoxLayout()
self.spin = QtWidgets.QSpinBox()
self.spin.setValue(5)
spin_layout.addWidget(QtWidgets.QLabel('Max work time:'))
spin_layout.addWidget(self.spin)
layout = QtWidgets.QVBoxLayout()
layout.addLayout(spin_layout)
layout.addWidget(self.start_button)
layout.addWidget(self.abort_button)
layout.addWidget(self.log)
self.setLayout(layout)
def start_processes(self):
self.log.clear()
self.thread_count = self.num_procs
self.threads = []
self._threads_completed = 0
max_load = self.spin.value()
self.start_time = time.time()
for idx in range(self.thread_count):
thread_worker = ThreadWorker(idx, max_load)
self.threads.append(thread_worker)
thread_worker.signals.done.connect(self.on_done)
thread_worker.signals.message.connect(self.on_message)
self.abort.connect(thread_worker.abort)
self._thread_pool.start(thread_worker)
self.start_button.setEnabled(False)
self.abort_button.setEnabled(True)
def on_message(self, text):
self.log.append(str(text))
def on_done(self, id):
self.log.append(f'ThreadWorker {id} is done')
self._threads_completed += 1
if self._threads_completed == self.thread_count:
self.log.append('No more workers active')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
self.start_button.setEnabled(True)
self.abort_button.setEnabled(False)
@QtCore.pyqtSlot()
def on_abort(self):
self.abort.emit()
self.log.append('Asking each thread worker to abort')
done = self._thread_pool.waitForDone(10000)
if not done:
self.log.append('WARNING: COULD NOT CLOSE THREADS')
else:
self.log.append('All threads exited')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
def closeEvent(self, event):
self.abort.emit()
done = self._thread_pool.waitForDone(5000)
if not done:
print('Threads still open!. Open that task manager!', flush=True)
else:
print('All threads exited', flush=True)
event.accept()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
dialog = MyDialog()
dialog.show()
sys.exit(app.exec_())
多进程与线程
我碰巧有一个“纯”线程示例,它足够接近,我想我可以比较这两种方法。线程和进程设置为最大值,这可能是理想的,也可能不是理想的。有一个包含一组大数的全局变量。这些数字被传递给每个进程或线程并用于做琐碎的工作。我很惊讶地看到多处理方法获胜:
Multiprocess time: 34.82416486740112
Threaded time: 57.59582781791687
这很有趣,以一种奇怪的方式。也许其他人会发现它有用或改进它。 (中止/清理过程没有完全组合在一起)。
import os
import sys
import time
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets
WORK_LOAD = [123456779, 98765554, 7666111, 966325, 978798, 65465, 447733331, 94613697]
def trap_exc_during_debug(*args):
# when app raises uncaught exception, print info
print(args, flush=True)
# install exception hook: without this, uncaught exception would cause application to exit
sys.excepthook = trap_exc_during_debug
class CommandMessages:
start = 'start'
stop = 'stop'
class ProcessWorker(mp.Process):
def __init__(self, id, pipe, work_load, daemon=True):
super().__init__()
self.daemon = daemon
self.pipe = pipe
self.id = id
self.work_load = work_load
print(f'Created worker {self.id} with work_load {self.work_load}', flush=True)
def run(self):
self.pipe.send(f"PureThreadWorker {self.id} in ({os.getpid()})")
while True:
try:
item = self.pipe.recv()
if item == CommandMessages.start:
self.pipe.send(f"worker ({self.id}): starting task")
lst = []
for i in range(self.work_load):
lst.append('x')
self.pipe.send(f"worker ({self.id}): task complete")
self.pipe.send(CommandMessages.stop)
break
except EOFError: # nothing left to receive
pass
class ThreadWorkerSignals(QtCore.QObject):
done = QtCore.pyqtSignal(int) # worker id
message = QtCore.pyqtSignal(str)
class ProcessThreadWorker(QtCore.QRunnable):
def __init__(self, id, work_load):
super().__init__()
self.signals = ThreadWorkerSignals()
self.id = id
self.work_load = work_load
self._abort = False
def run(self):
thread_name = QtCore.QThread.currentThread().objectName()
thread_id = int(QtCore.QThread.currentThreadId()) # cast to int() to get Id, otherwise it's sip object
self.signals.message.emit(f'Running ProcessThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')
self.signals.message.emit(f'ProcessThreadWorker #{self.id} work_load is {self.work_load}')
m_conn, s_conn = mp.Pipe()
self.pipe = m_conn
self.process_worker = ProcessWorker(self.id, s_conn, self.work_load)
self.signals.message.emit(f'ProcessThreadWorker {self.id}: starting self.process_worker...')
self.process_worker.start()
self.pipe.send(CommandMessages.start)
self.listen()
def listen(self):
self.signals.message.emit(f'ProcessThreadWorker {self.id} listening')
while True:
try:
msg = self.pipe.recv()
if msg == CommandMessages.stop:
self.signals.message.emit(f'ProcessThreadWorker {self.id}: closing process_worker...')
self.process_worker.join(2)
self.process_worker.terminate()
print(f'ProcessThreadWorker {self.id}: process_worker closed', flush=True)
self.signals.message.emit(f'ProcessThreadWorker {self.id}: process_worker closed')
self.signals.done.emit(self.id)
break
self.signals.message.emit(str(msg))
except EOFError: # nothing left to receive
pass
def abort(self):
self.signals.message.emit(f'ProcessThreadWorker #{self.id} notified to abort')
self._abort = True
class PureThreadWorker(QtCore.QRunnable):
def __init__(self, id, work_load):
super().__init__()
self.signals = ThreadWorkerSignals()
self.id = id
self.work_load = work_load
self._abort = False
def run(self):
thread_name = QtCore.QThread.currentThread().objectName()
thread_id = int(QtCore.QThread.currentThreadId()) # cast to int() to get Id, otherwise it's sip object
self.signals.message.emit(f'Running ProcessThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')
self.signals.message.emit(f'PureThreadWorker {self.id} work_load is {self.work_load}')
self.signals.message.emit(f"PureThreadWorker {self.id}: starting task")
lst = []
for i in range(self.work_load):
lst.append('x')
self.signals.message.emit(f"PureThreadWorker {self.id}: task complete")
self.signals.done.emit(self.id)
def abort(self):
self.signals.message.emit(f'PureThreadWorker #{self.id} notified to abort')
self._abort = True
class MyDialog(QtWidgets.QDialog):
abort = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self.start_time = None
self.setWindowTitle('MP Concurrency with QThreadPool')
self.resize(600, 400)
self.num_procs = mp.cpu_count()
self._thread_pool = QtCore.QThreadPool.globalInstance()
QtCore.QThread.currentThread().setObjectName('main')
self.threads = []
self.button_start_processes = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
self.button_start_processes.pressed.connect(self.start_processes)
self.button_start_threads = QtWidgets.QPushButton()
self.button_start_threads.clicked.connect(self.start_threads)
self.button_start_threads.setText(f"Start {self._thread_pool.maxThreadCount()} threads")
self.abort_button = QtWidgets.QPushButton(f'Abort')
self.abort_button.pressed.connect(self.on_abort)
self.abort_button.setEnabled(False)
self.log = QtWidgets.QTextBrowser()
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.button_start_processes)
layout.addWidget(self.button_start_threads)
layout.addWidget(self.abort_button)
layout.addWidget(self.log)
self.setLayout(layout)
def start_processes(self):
self.log.clear()
self.thread_count = self.num_procs
self.threads = []
self._threads_completed = 0
self.log.append(f'Max procs: {self.num_procs}')
self.start_time = time.time()
for idx in range(self.thread_count):
thread_worker = ProcessThreadWorker(idx, WORK_LOAD[idx])
self.threads.append(thread_worker)
thread_worker.signals.done.connect(self.on_process_done)
thread_worker.signals.message.connect(self.on_message)
self.abort.connect(thread_worker.abort)
self._thread_pool.start(thread_worker)
self.button_start_processes.setEnabled(False)
self.abort_button.setEnabled(True)
def start_threads(self):
self.log.clear()
self.pure_workers_done = 0
self.pure_workers = []
self.worker_count = self._thread_pool.maxThreadCount()
self.log.append(f'Max threads: {self._thread_pool.maxThreadCount()}')
self.start_time = time.time()
for idx in range(self.worker_count):
worker = PureThreadWorker(idx, WORK_LOAD[idx])
self.pure_workers.append(worker)
# get progress messages from worker:
worker.signals.done.connect(self.on_pure_done)
worker.signals.message.connect(self.log.append)
# control worker:
self.abort.connect(worker.abort)
self._thread_pool.start(worker)
self.button_start_processes.setEnabled(False)
self.abort_button.setEnabled(True)
def on_message(self, text):
self.log.append(str(text))
def on_process_done(self, id):
self.log.append(f'ProcessThreadWorker {id} is done')
self._threads_completed += 1
if self._threads_completed == self.thread_count:
self.log.append('No more workers active')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
self.button_start_processes.setEnabled(True)
self.abort_button.setEnabled(False)
def on_pure_done(self, id):
self.log.append(f'PureThreadWorker {id} is done')
self.pure_workers_done += 1
if self.pure_workers_done == self.worker_count:
self.log.append('No more pure workers active')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
self.button_start_processes.setEnabled(True)
self.abort_button.setEnabled(False)
@QtCore.pyqtSlot()
def on_abort(self):
self.abort.emit()
self.log.append('Asking each thread worker to abort')
done = self._thread_pool.waitForDone(10000)
if not done:
self.log.append('WARNING: COULD NOT CLOSE THREADS')
else:
self.log.append('All threads exited')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
def closeEvent(self, event):
self.abort.emit()
done = self._thread_pool.waitForDone(5000)
if not done:
print('Threads still open!. Open that task manager!', flush=True)
else:
print('All threads exited', flush=True)
event.accept()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
dialog = MyDialog()
dialog.show()
sys.exit(app.exec_())