【发布时间】:2012-07-19 09:54:47
【问题描述】:
我正在尝试实现一些基本调度,以产生最多 n 个进程。一次打开并等待它们执行。为此我已经完成了:
CHECKING_INTERVAL = 10
class StandAloneClient(object):
"""
Instead of communicating with a backend cluster, fire locally a new thread.
"""
max_thread_nr = 4
thread_pool = []
pending_operations = Queue.Queue(0)
class OperationExecutor(threading.Thread):
def run(self):
"""
Get the required data from the operation queue and launch the operation.
"""
while True:
launch_data = StandAloneClient.pending_operations.get()
if launch_data != None:
operation_id = launch_data[0]
user_name_label = launch_data[1]
LOGGER.debug("Launching operation " + str(operation_id) + " with name " + str(user_name_label))
## Create a new process for the new launched operation
oper = ['python', '-m', 'processRunner', str(operation_id), user_name_label]
launched_process = Popen(oper, stdout=PIPE, stdin=PIPE, stderr=PIPE)
# launched_process.wait()
# while launched_process.poll() is None:
# sleep(CHECKING_INTERVAL)
# LOGGER.debug("Operation id=%s is still running. Going to sleep for %s seconds."%(operation_id,
# CHECKING_INTERVAL))
LOGGER.debug("===========================================================")
LOGGER.debug("Finished operation %s succesfully."%(operation_id,))
def __init__(self):
"""
If there are still empty spots create a new executor thread.
"""
for _ in xrange(self.max_thread_nr - len(self.thread_pool)):
new_executor = StandAloneClient.OperationExecutor()
self.thread_pool.append(new_executor)
new_executor.start()
@staticmethod
def execute(operation_id, user_name_label="Unknown"):
"""Start asynchronous operation locally"""
StandAloneClient.pending_operations.put((operation_id, user_name_label))
我通过以下方式向队列添加操作:
StandAloneClient().execute(...)
现在我已经评论了我认为可能由于某种原因阻塞线程的部分。但即便如此,似乎没有一个孩子产生过。执行到最后,我检查了日志,processRunner.py 完成了它需要做的所有事情,直到最后,但是如果我执行ps -el|grep python,我仍然会看到所有进程:
0 S 1000 755 1 5 80 0 - 548314 poll_s pts/0 00:00:13 python
0 S 1000 1198 755 4 80 0 - 280172 futex_ pts/0 00:00:09 python
0 S 1000 1201 755 4 80 0 - 280176 futex_ pts/0 00:00:09 python
0 S 1000 1206 755 4 80 0 - 280230 futex_ pts/0 00:00:09 python
0 S 1000 1215 755 4 80 0 - 280198 futex_ pts/0 00:00:09 python
0 S 1000 1216 755 4 80 0 - 281669 futex_ pts/0 00:00:09 python
0 S 1000 1221 755 4 80 0 - 280201 futex_ pts/0 00:00:09 python
0 S 1000 1231 755 4 80 0 - 281668 futex_ pts/0 00:00:09 python
0 S 1000 1240 755 4 80 0 - 280229 futex_ pts/0 00:00:09 python
0 S 1000 1257 755 4 80 0 - 280201 futex_ pts/0 00:00:09 python
我正在使用 Python 2.7.2 的 fedora 16 机器上尝试此操作。任何建议>
问候, 波格丹
【问题讨论】:
-
为什么使用子进程而不是多进程在多个进程中运行Python代码?附注:您的代码对于这样一个简单的任务很复杂
标签: python multithreading subprocess