【发布时间】:2020-12-05 21:02:05
【问题描述】:
我正在尝试解决二进制分类问题。因此,我在 Keras(带有 TensorFlow 后端)中创建了一个模型,并在 CPU 上训练了该模型。我使用 Keras API 将模型保存为 TensorFlow SavedModel 格式。我将 Kaggle 内核与 Python 3.7.6、Keras 2.4.3 和 TensorFlow 2.3.0 一起使用。
这是我使用的简化代码以及一些模型数据(实际上我是在 GPU 上训练我的模型,但我相信我面临的问题与这一事实无关):
# setup
import numpy as np
random_state = 0
# create mockup data
train_labels = np.random.randint(2, size=(1000))
validation_labels = np.random.randint(2, size=(200))
train_features = np.random.rand(1000, 49)
validation_features = np.random.rand(200, 49)
# create and train model
from keras.models import Sequential
from keras.layers import Dense, Dropout
dim = train_features.shape[1]
model = Sequential()
model.add(Dense(dim, input_dim=dim, activation='relu'))
model.add(Dropout(0.2, seed=random_state))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics='accuracy')
model.summary()
epochs = 10
batch_size = 100
model.fit(train_features, train_labels, epochs=epochs, batch_size=batch_size, verbose=1, validation_data=(validation_features, validation_labels))
# evaluate model
from sklearn.metrics import confusion_matrix
prediction_labels = model.predict_classes(validation_features)
print(confusion_matrix(prediction_labels, validation_labels))
# save model
model.save('/kaggle/working/model')
然后我在新会话中加载模型。当我对单个 CPU 进行预测时,它工作得很好。然而,我真正想做的是对所有 4 个可用的 CPU 进行并行预测(与我在这里的模型相比,我试图解决的实际问题要复杂得多,并且涉及更多的数据)。我尝试使用multiprocessing 这样做:
# setup
import numpy as np
import pandas as pd
random_state = 0
# load model
from keras.models import load_model
model = load_model('../input/repro-model/model')
# create mockup test data
test_features = np.random.rand(1000, 49)
# make some test predictions on the entire test set and a single observation
model.predict_classes(test_features)
model.predict_classes(test_features[[0]])
# set up test dataframe for making predictions in parallel
test_df = pd.DataFrame(test_features)
seq_ids = []
for i in np.arange(1,201):
seq_id = [i] * 5
seq_ids.append(seq_id)
frm_ids = [np.arange(1,6)] * 200
test_df['seq_id'] = [item for sublist in seq_ids for item in sublist]
test_df['frm_id'] = [item for sublist in frm_ids for item in sublist]
# test the setup
seq_id = 1
frm_id = 1
model.predict_classes(test_df[(test_df.seq_id == seq_id) & (test_df.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))
# create function for making predictions
def make_prediction(model, data, seq_id, frm_id):
print(seq_id)
pred = model.predict_classes(data[(data.seq_id == seq_id) & (data.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))
return pred
# make test prediction
make_prediction(model, test_df, 1, 1)
# make predictions in parallel
from multiprocessing import Pool
import itertools
workers = 4
p = Pool(processes=workers)
seq_list, frm_list = np.arange(1, 201), np.arange(1, 6)
id_pair_list = list(itertools.product(seq_list, frm_list))
predictions = p.starmap(make_prediction, [(model, test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
p.close()
当我运行上面的代码时,我得到以下堆栈跟踪和错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-85-52e9e388daba> in <module>
8 id_pair_list = list(itertools.product(seq_list, frm_list))
9
---> 10 predictions = p.starmap(make_prediction, [(model, test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
11 p.close()
/opt/conda/lib/python3.7/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
274 `func` and (a, b) becomes func(a, b).
275 '''
--> 276 return self._map_async(func, iterable, starmapstar, chunksize).get()
277
278 def starmap_async(self, func, iterable, chunksize=None, callback=None,
/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
/opt/conda/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
429 break
430 try:
--> 431 put(task)
432 except Exception as e:
433 job, idx = task[:2]
/opt/conda/lib/python3.7/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
/opt/conda/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: can't pickle _thread.RLock objects
阅读完关于这个主题的所有内容后,我在 SO(例如,here、here、here 和 here)和 GitHub(例如,here 和 here 和 here)上找到了我认为酸洗我的顺序 Keras 模型和多线程存在一些问题。然后我尝试在我的预测函数中加载模型并在每次预测函数调用后将其删除,如下所示:
def make_prediction(data, seq_id, frm_id):
print(seq_id)
from keras.models import load_model
model = load_model('../input/repro-model/model')
pred = model.predict_classes(data[(data.seq_id == seq_id) & (data.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))
del model
return pred
from multiprocessing import Pool
import itertools
workers = 4
p = Pool(processes=workers)
seq_list, frm_list = np.arange(1, 201), np.arange(1, 6)
id_pair_list = list(itertools.product(seq_list, frm_list))
predictions = p.starmap(make_prediction, [(test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
p.close()
我不再收到错误消息,但是在进行前四个预测时执行星图函数调用时进程挂起。所以肯定还有一些问题。使用model._make_predict_function() 不起作用,因为它似乎是deprecated。在这个主题上也有很多关于 SO 的帖子没有得到答复:here、here 和 here。
有没有人知道如何在 CPU 上使用我的顺序 Keras 模型并行进行预测?那太棒了,因为我已经被这个问题困扰了很长一段时间了。非常感谢!
【问题讨论】:
-
能否请您添加您正在使用的tensorflow版本。
-
TensorFlow 2.3.0。感谢您查看我的问题!我会检查你的建议。
-
我在使用 tf 2.3.0 时遇到了同样的问题。但下面提出的解决方案适用于 2.0.0
标签: python tensorflow keras multiprocessing