【发布时间】:2015-07-30 07:39:05
【问题描述】:
我正在尝试在 python 中使用多线程的队列。我只是想知道我使用的方法是否正确。如果我在做一些多余的事情,或者如果有更好的方法我应该使用。
我正在尝试从表中获取新请求并使用一些逻辑来安排它们以执行某些操作,例如运行查询。
所以我在这里从主线程为队列生成了一个单独的线程。
if __name__=='__main__':
request_queue = SetQueue(maxsize=-1)
worker = Thread(target=request_queue.process_queue)
worker.setDaemon(True)
worker.start()
while True:
try:
#Connect to the database get all the new requests to be verified
db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
#Get new requests for verification
verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE JOB_STATUS='%s' ORDER BY JOB_ID" %
(username_testschema, 'INITIATED'))
#If there are some requests to be verified, put them in the queue.
if len(verify_these) > 0:
for row in verify_these:
print "verifying : %s" % row[0]
verify_id = row[0]
request_queue.put(verify_id)
except Exception as e:
logger.exception(e)
finally:
time.sleep(10)
现在在 Setqueue 类中,我有一个 process_queue 函数,用于处理每次运行中添加到队列中的前 2 个请求。
'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''
class SetQueue(Queue.Queue):
def _init(self, maxsize):
Queue.Queue._init(self, maxsize)
self.all_items = set()
def _put(self, item):
if item not in self.all_items:
Queue.Queue._put(self, item)
self.all_items.add(item)
'''
The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
This way max two requests per run will be processed.
'''
def process_queue(self):
while True:
scheduler_obj = Scheduler()
try:
if self.qsize() > 0:
for i in range(2):
job_id = self.get()
t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
t.start()
for i in range(2):
t.join(timeout=1)
self.task_done()
except Exception as e:
logger.exception(
"QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
finally:
time.sleep(10)
我想看看我的理解是否正确,是否有任何问题。
所以当主函数中的 True 连接到数据库时运行的主线程获取新请求并将其放入队列中。队列的工作线程(守护进程)不断从队列中获取新请求并派生执行处理的非守护线程,并且由于连接的超时时间为 1,工作线程将继续接受新请求而不会被阻塞,并且它的子线程将继续在后台处理。对吗?
因此,如果主进程退出,这些将在完成工作之前不会被杀死,但工作守护线程将退出。 疑问:如果父进程是守护进程,子进程是非守护进程,如果父进程退出,子进程是否退出?)。
我也在这里阅读:-David beazley multiprocessing
大卫比兹利在使用池作为线程协处理器部分试图解决类似的问题。所以我应该按照他的步骤:- 1. 创建一个进程池。 2.像我为request_queue做的那样打开一个线程 3.在那个线程中
def process_verification_queue(self):
while True:
try:
if self.qsize() > 0:
job_id = self.get()
pool.apply_async(Scheduler.verify_func, args=(job_id,))
except Exception as e:
logger.exception("QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
使用池中的进程并并行运行 verify_func。这会给我带来更好的性能吗?
【问题讨论】:
-
请您修复缩进?
-
我认为我无法充分回答您的所有问题,但您可以考虑遵循流行的Celery Project 中的一些架构模式。我并不是要推荐它作为您正在创建的解决方案,但它是一个成熟的项目,可以突出您可以复制的体面方法。
-
我会看的。谢谢@erik-e
标签: python queue python-multithreading