【发布时间】:2015-09-19 04:03:47
【问题描述】:
这适用于 Python 3.x
我正在从 CSV 文件中加载 300 条记录,然后生成工作线程以将它们提交到 REST API。我将 HTTP 响应保存在队列中,以便在处理整个 CSV 文件后计算跳过的记录数。但是,在我向工作人员添加队列后,线程似乎不再关闭。我想监控线程数有两个原因:(1)一旦完成,我可以计算并显示跳过计数和(2)我想增强我的脚本以产生不超过 20 个左右的线程,所以我不要内存不足。
我有两个问题:
- 有人可以解释为什么使用
q.put()时线程保持活动状态吗? - 是否有不同的方法来管理线程数并监控是否所有线程都已完成?
这是我的代码(有些简化,因为我无法分享我正在调用的 API 的确切细节):
import requests, json, csv, time, datetime, multiprocessing
TEST_FILE = 'file.csv'
def read_test_data(path, chunksize=300):
leads = []
with open(path, 'rU') as data:
reader = csv.DictReader(data)
for index, row in enumerate(reader):
if (index % chunksize == 0 and index > 0):
yield leads
del leads[:]
leads.append(row)
yield leads
def worker(leads, q):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
q.put(r.text) # this puts the response in a queue for later analysis
return
if __name__ == "__main__":
q = multiprocessing.Queue() # this is a queue to put all HTTP responses in, so we count the skips
jobs = []
for leads in read_test_data(TEST_FILE): # This function reads a CSV file and provides 300 records at a time
p = multiprocessing.Process(target=worker, args=(leads,q,))
jobs.append(p)
p.start()
time.sleep(20) # checking if processes are closing automatically (they don't)
print(len(multiprocessing.active_children())) ## always returns the number of threads. If I remove 'q.put' from worker, it returns 0
# The intent is to wait until all workers are done, but it results in an infinite loop
# when I remove 'q.put' in the worker it works fine
#while len(multiprocessing.active_children()) > 0: #
# time.sleep(1)
skipped_count = 0
while not q.empty(): # calculate number of skipped records based on the HTTP responses in the queue
http_response = json.loads(q.get())
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))
【问题讨论】:
-
只是想澄清一下:您在上述程序中使用了多个进程,而不是线程。两者之间的内存和消息传递开销存在很大差异,因此清楚自己使用的是哪一个非常重要。
-
还值得注意的是,由于您的问题是 I/O 密集型的,您可能可以使用线程而不是进程,并且与同步方法相比仍然可以获得良好的性能。
标签: python multithreading python-3.x queue python-multiprocessing