【发布时间】:2021-06-23 18:48:16
【问题描述】:
我有一个包含 270 万个样本的数据集,我需要在这些数据集上测试我的 ML 模型。我的笔记本电脑上有 8 个内核,我想尝试并行化我的测试代码以节省时间。这是测试功能:
def testMTGP(x_sample, y_sample, ind, model, likelihood):
x_sample = x_sample.view(1, -1)
y_sample = y_sample.view(1, -1)
model.eval()
likelihood.eval()
with torch.no_grad():
prediction = likelihood(model(x_sample))
mean = (prediction.mean).detach().numpy()
prewhiten_error = (y_sample.detach().numpy()) - mean
cov_matrix = (prediction.covariance_matrix).detach().numpy()
white_error, matcheck = Whiten(prewhiten_error, cov_matrix)
return (
ind,
{
"prediction": mean,
"prewhiten_error": prewhiten_error,
"white_error": white_error,
"cov_matrix": cov_matrix,
"matcheck": matcheck,
},
)
我返回与我测试的样本相对应的索引以及与模型为测试所做的计算相关的数据字典。函数Whiten(prewhiten_error, cov_matrix)也是我自己定义的,在代码文件开头导入的,所以全局可用。它只是接受输入,转换 cov_matrix 并将其与 prewhiten_error 相乘并返回答案,以及一个指示有关 cov_matrix 的一些状态信息的变量。
对于多处理,想法是首先将整个数据集分成大小大致相等的块;挑选每个块并将一个样本发送到每个核心进行处理。我正在使用pool.apply_async。这是代码:
test_X = torch.load(test_X_filename) #torch tensor of shape 2.7M x 3
test_Y = torch.load(test_Y_filename) #torch tensor of shape 2.7M x 3
cores = mp.cpu_count()
chunk_size = int(test_X.shape[0] / cores)
start_time = time.time()
parent_list = []
for start_ind in range(0, test_X.shape[0], chunk_size):
pool = mp.Pool(processes=cores)
proc_data_size = int(chunk_size / cores)
stop_ind = min(test_X.shape[0], start_ind + chunk_size)
results = [
pool.apply_async(
testMTGP, (test_X[i].detach(), test_Y[i].detach(), i, model, likelihood,)
)
for i in range(start_ind, stop_ind)
]
for res in results:
print("Length of results list= ", len(results))
print("Data type of res is: ", type(res))
res_dict = res.get()
parent_list.append(res_dict)
pool.close()
test_X[i] 和 test_Y[i] 都是形状为 (3,) 的张量。在执行我得到的代码时:
回溯(最近一次通话最后一次):
文件“multiproc_async.py”,第 288 行,在
res_dict = res.get() # [1]
文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/pool.py”, 第 771 行,在获取中
提高自我价值
文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/pool.py”, 第 537 行,在 _handle_tasks
放(任务)
文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/connection.py”, 第 206 行,在发送中
self._send_bytes(_ForkingPickler.dumps(obj))
文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/reduction.py”, 第 51 行,转储中
cls(buf, 协议).dump(obj)
AttributeError:无法腌制本地对象 MultitaskGaussianLikelihood.__init__.<locals>.<lambda>
我是多处理新手,谷歌搜索此错误并没有真正帮助(其中一些不相关,有些超出我的理解)。有人可以帮我理解我犯了什么错误吗?
【问题讨论】:
-
请将堆栈跟踪作为文本发布,而不是图像
-
另外,建议使用上下文管理器而不是池,pool.close (
with multiprocessing.Pool(processes=3) as pool:)
标签: python multiprocessing apply-async