使用队列进行任务控制
1 FIFO与LIFO队列
FIFO(First In First Out)与LIFO(Last In First Out)分别是两种队列形式,在FIFO中,满足先入先出的队列方式,而LIFO则是后入先出的队列形式,利用这两种方式可以实现不同的队列功能。
1 from random import randint 2 from time import sleep, ctime 3 from queue import Queue, LifoQueue 4 from threading import Thread 5 6 COUNT = 0 7 8 9 class MyThread(Thread): 10 """ 11 Bulid up a Module to make this subclass more general 12 And get return value by add a function named 'getResult()' 13 """ 14 def __init__(self, func, args, name=''): 15 Thread.__init__(self) 16 self.name = name 17 self.func = func 18 self.args = args 19 20 def getResult(self): 21 return self.res 22 23 def run(self): 24 print('Starting', self.name, 'at:', ctime()) 25 # Call function here and calculate the running time 26 self.res = self.func(*self.args) 27 print(self.name, 'finished at:', ctime()) 28 29 30 class MyQueue(): 31 def __init__(self): 32 self.funcs = [self.writer, self.reader] 33 self.nfuncs = range(len(self.funcs)) 34 35 def writeQ(self, queue): 36 global COUNT 37 print('Producing object OBJ_%d for Q...' % COUNT, end=' ') 38 queue.put('OBJ_%d' % COUNT, True) 39 print('size now:', queue.qsize()) 40 COUNT += 1 41 42 def readQ(self, queue): 43 # If queue is empty, block here until queue available 44 val = queue.get(True) 45 print('Consumed object %s from Q... size now:' % val, queue.qsize()) 46 47 def writer(self, queue, loops): 48 for i in range(loops): 49 self.writeQ(queue) 50 sleep(randint(1, 3)) 51 52 def reader(self, queue, loops): 53 for i in range(loops): 54 self.readQ(queue) 55 sleep(randint(2, 5)) 56 57 def main(self): 58 nloops = randint(2, 5) 59 fifoQ = Queue(32) 60 lifoQ = LifoQueue(32) 61 62 # First In First Out mode for Queue 63 print('-----Start FIFO Queue-----') 64 threads = [] 65 for i in self.nfuncs: 66 threads.append(MyThread(self.funcs[i], (fifoQ, nloops), self.funcs[i].__name__)) 67 for t in threads: 68 t.start() 69 for t in threads: 70 t.join() 71 # Last In First Out mode for LifoQueue 72 print('-----Start LIFO Queue-----') 73 threads = [] 74 for i in self.nfuncs: 75 threads.append(MyThread(self.funcs[i], (lifoQ, nloops), self.funcs[i].__name__)) 76 for t in threads: 77 t.start() 78 for t in threads: 79 t.join() 80 81 print('All DONE') 82 83 if __name__ == '__main__': 84 MyQueue().main()
第 1-27 行,首先对需要的模块进行导入,并定义一个全局变量的计数器,派生一个MyThread线程类,用于调用函数及其返回值(本例中MyThread可用于接受writer和reader函数,同时将Queue的实例作为参数传给这两个函数)。
第 30-79 行,定义一个队列类,用于进行队列一系列处理,其中writeQ与readQ会分别对队列执行put和get函数,在writeQ中利用全局变量设置每个加入队列的对象的名字。而writer和reader则会利用循环多次执行writeQ和readQ函数。最后定义一个main函数,用于生成队列,同时调用FIFO以及LIFO两种队列方式。
运行得到结果
-----Start FIFO Queue----- Starting writer at: Tue Aug 1 21:43:22 2017 Producing object OBJ_0 for Q... size now: 1 Starting reader at: Tue Aug 1 21:43:22 2017 Consumed object OBJ_0 from Q... size now: 0 Producing object OBJ_1 for Q... size now: 1 Producing object OBJ_2 for Q... size now: 2 Producing object OBJ_3 for Q... size now: 3 Consumed object OBJ_1 from Q... size now: 2 writer finished at: Tue Aug 1 21:43:26 2017 Consumed object OBJ_2 from Q... size now: 1 Consumed object OBJ_3 from Q... size now: 0 reader finished at: Tue Aug 1 21:43:34 2017 -----Start LIFO Queue----- Starting writer at: Tue Aug 1 21:43:34 2017 Producing object OBJ_4 for Q... size now: 1 Starting reader at: Tue Aug 1 21:43:34 2017 Consumed object OBJ_4 from Q... size now: 0 Producing object OBJ_5 for Q... size now: 1 Producing object OBJ_6 for Q... size now: 2 Producing object OBJ_7 for Q... size now: 3 writer finished at: Tue Aug 1 21:43:38 2017 Consumed object OBJ_7 from Q... size now: 2 Consumed object OBJ_6 from Q... size now: 1 Consumed object OBJ_5 from Q... size now: 0 reader finished at: Tue Aug 1 21:43:53 2017 All DONE