【发布时间】:2018-12-05 21:59:50
【问题描述】:
我是 python 多任务处理的新手。我正在以老式的方式进行操作:
我继承自 threading.Thread 并使用 queue.Queue 队列向/从主线程发送消息。
这是我的基本线程类:
class WorkerGenerico(threading.Thread):
def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
super(WorkerGenerico, self).__init__()
self._task_id = task_id
if input_q is None:
self._input_q = queue.Queue()
else:
if isinstance(input_q, queue.Queue):
self._input_q = input_q
else:
raise TypeError("input_q debe ser del tipo queue.Queue")
if output_q is None:
self._output_q = queue.Queue()
else:
if isinstance(output_q, queue.Queue):
self._output_q = output_q
else:
raise TypeError("input_q debe ser del tipo queue.Queue")
if not isinstance(keep_alive, int):
raise TypeError("El valor de keep_alive debe der un int.")
self._keep_alive = keep_alive
self.stoprequest = threading.Event()
# def run(self):
# Implement a loop in subclases which checks if self.has_orden_parada() is true in order to stop.
def join(self, timeout=None):
self.stoprequest.set()
super(WorkerGenerico, self).join(timeout)
def gracefull_stop(self):
self.stoprequest.set()
def has_orden_parada(self):
return self.stoprequest.is_set()
def put(self,texto, block=True, timeout=None):
return self._input_q.put(texto, block=block, timeout=timeout)
def get(self, block=True, timeout=None):
return self._output_q.get(block=block, timeout=timeout)
我的问题是从外部调用 WorkerGenerico.get() 是多么昂贵,而不是在主线程中存储 que 队列并使用 Queue.get()。 两种方法在处理少量非频繁控制消息时的性能看起来相似,但是,我想非常频繁的调用会使方法 B 值得使用。:
我猜模式 A 更消耗资源(它必须以某种方式从外部线程调用方法并将队列定义传回,我猜损失取决于 Python 实现),但是,最终代码更具可读性和直观。
如果我必须从使用其他语言的经验来判断,我会说方法 B 要好得多,对吗?
方法一:
def main()
worker = WorkerGenerico(task_id=1)
worker.start()
print(worker.get())
方法B:
def main()
input_q = Queue()
output_q = Queue()
worker = WorkerGenerico(task_id=1, input_q=input_q, output_q=output_q)
worker.start()
print(output_q.get())
顺便说一句:为了完整起见,我想分享一下我现在的做法。它混合了两种方法,为线程提供了一个很好的包络:
class EnvoltorioWorker:
def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
if input_q is None:
self._input_q = queue.Queue()
else:
if isinstance(input_q, queue.Queue):
self._input_q = input_q
else:
raise TypeError("input_q debe ser del tipo queue.Queue")
if output_q is None:
self._output_q = queue.Queue()
else:
if isinstance(output_q, queue.Queue):
self._output_q = output_q
else:
raise TypeError("input_q debe ser del tipo queue.Queue")
self.worker = WorkerGenerico(task_id, input_q, output_q, keep_alive)
def put(self, elem, block=True, timeout=None):
return self._input_q.put(elem, block=block, timeout=timeout)
def get(self, block=True, timeout=None):
return self._output_q.get(block=block, timeout=timeout)
我使用 EnvoltorioWorker.worker.* 调用 joins 或其他外部控制方法和 EnvoltorioWorker.get / EnvoltorioWorker.put 与内部类正确通信,就像这样:
def main()
worker_container = EnvoltorioWorker(task_id=1)
worker_container.worker.start()
print(worker_container.get())
如果不需要其他对 worker 的访问,我通常也会在 EnvoltorioWorker 中为 start()、join() 和 nonwait_stop() 创建接口。
它可能看起来很虚,并且可能有更好的方法来实现这一点,所以:
哪种方法(A 或 B)更好?从 Thread 继承是在 Python 中处理线程的正确方法吗?我正在使用 dispycos 用于分布式环境和类似的信封与我的线程进行通信
编辑:刚刚注意到我忘记在类中翻译 cmets 和一些字符串,但它们很简单,所以我认为它是可读的。有时间我会修改的。
有什么想法吗?
【问题讨论】:
标签: python multithreading performance queue python-multithreading