【问题标题】:How to make predictions in parallel using sequential Keras model and multiprocessing?如何使用顺序 Keras 模型和多处理并行进行预测?
【发布时间】:2020-12-05 21:02:05
【问题描述】:

我正在尝试解决二进制分类问题。因此,我在 Keras(带有 TensorFlow 后端)中创建了一个模型,并在 CPU 上训练了该模型。我使用 Keras API 将模型保存为 TensorFlow SavedModel 格式。我将 Kaggle 内核与 Python 3.7.6、Keras 2.4.3 和 TensorFlow 2.3.0 一起使用。

这是我使用的简化代码以及一些模型数据(实际上我是在 GPU 上训练我的模型,但我相信我面临的问题与这一事实无关):

# setup
import numpy as np

random_state = 0

# create mockup data
train_labels = np.random.randint(2, size=(1000))
validation_labels = np.random.randint(2, size=(200))
train_features = np.random.rand(1000, 49)
validation_features = np.random.rand(200, 49)

# create and train model
from keras.models import Sequential
from keras.layers import Dense, Dropout

dim = train_features.shape[1]

model = Sequential()
model.add(Dense(dim, input_dim=dim, activation='relu'))
model.add(Dropout(0.2, seed=random_state))
model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer='adam', metrics='accuracy')
model.summary()

epochs = 10
batch_size = 100

model.fit(train_features, train_labels, epochs=epochs, batch_size=batch_size, verbose=1, validation_data=(validation_features, validation_labels))

# evaluate model
from sklearn.metrics import confusion_matrix

prediction_labels = model.predict_classes(validation_features)
print(confusion_matrix(prediction_labels, validation_labels))

# save model
model.save('/kaggle/working/model')

然后我在新会话中加载模型。当我对单个 CPU 进行预测时,它工作得很好。然而,我真正想做的是对所有 4 个可用的 CPU 进行并行预测(与我在这里的模型相比,我试图解决的实际问题要复杂得多,并且涉及更多的数据)。我尝试使用multiprocessing 这样做:

# setup
import numpy as np
import pandas as pd

random_state = 0

# load model
from keras.models import load_model

model = load_model('../input/repro-model/model')

# create mockup test data
test_features = np.random.rand(1000, 49)

# make some test predictions on the entire test set and a single observation
model.predict_classes(test_features)
model.predict_classes(test_features[[0]])

# set up test dataframe for making predictions in parallel
test_df = pd.DataFrame(test_features)

seq_ids = []
for i in np.arange(1,201):
    seq_id = [i] * 5
    seq_ids.append(seq_id)

frm_ids = [np.arange(1,6)] * 200

test_df['seq_id'] = [item for sublist in seq_ids for item in sublist]
test_df['frm_id'] = [item for sublist in frm_ids for item in sublist]

# test the setup
seq_id = 1
frm_id = 1

model.predict_classes(test_df[(test_df.seq_id == seq_id) & (test_df.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))

# create function for making predictions
def make_prediction(model, data, seq_id, frm_id):
    
    print(seq_id)
    
    pred = model.predict_classes(data[(data.seq_id == seq_id) & (data.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))

    return pred

# make test prediction
make_prediction(model, test_df, 1, 1)

# make predictions in parallel
from multiprocessing import Pool
import itertools

workers = 4
p = Pool(processes=workers)

seq_list, frm_list = np.arange(1, 201), np.arange(1, 6)
id_pair_list = list(itertools.product(seq_list, frm_list))

predictions = p.starmap(make_prediction, [(model, test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
p.close()

当我运行上面的代码时,我得到以下堆栈跟踪和错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-85-52e9e388daba> in <module>
      8 id_pair_list = list(itertools.product(seq_list, frm_list))
      9 
---> 10 predictions = p.starmap(make_prediction, [(model, test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
     11 p.close()

/opt/conda/lib/python3.7/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
    274         `func` and (a, b) becomes func(a, b).
    275         '''
--> 276         return self._map_async(func, iterable, starmapstar, chunksize).get()
    277 
    278     def starmap_async(self, func, iterable, chunksize=None, callback=None,

/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

/opt/conda/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    429                         break
    430                     try:
--> 431                         put(task)
    432                     except Exception as e:
    433                         job, idx = task[:2]

/opt/conda/lib/python3.7/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

/opt/conda/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

TypeError: can't pickle _thread.RLock objects

阅读完关于这个主题的所有内容后,我在 SO(例如,herehereherehere)和 GitHub(例如,hereherehere)上找到了我认为酸洗我的顺序 Keras 模型和多线程存在一些问题。然后我尝试在我的预测函数中加载模型并在每次预测函数调用后将其删除,如下所示:

def make_prediction(data, seq_id, frm_id):
    
    print(seq_id)
    
    from keras.models import load_model
    model = load_model('../input/repro-model/model')
    
    pred = model.predict_classes(data[(data.seq_id == seq_id) & (data.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))
    
    del model
    
    return pred

from multiprocessing import Pool
import itertools

workers = 4
p = Pool(processes=workers)

seq_list, frm_list = np.arange(1, 201), np.arange(1, 6)
id_pair_list = list(itertools.product(seq_list, frm_list))

predictions = p.starmap(make_prediction, [(test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
p.close()

我不再收到错误消息,但是在进行前四个预测时执行星图函数调用时进程挂起。所以肯定还有一些问题。使用model._make_predict_function() 不起作用,因为它似乎是deprecated。在这个主题上也有很多关于 SO 的帖子没有得到答复:hereherehere

有没有人知道如何在 CPU 上使用我的顺序 Keras 模型并行进行预测?那太棒了,因为我已经被这个问题困扰了很长一段时间了。非常感谢!

【问题讨论】:

  • 能否请您添加您正在使用的tensorflow版本。
  • TensorFlow 2.3.0。感谢您查看我的问题!我会检查你的建议。
  • 我在使用 tf 2.3.0 时遇到了同样的问题。但下面提出的解决方案适用于 2.0.0

标签: python tensorflow keras multiprocessing


【解决方案1】:

添加此代码片段帮助我解决了问题(但仅适用于 tf 版本 2.0.0)

import tensorflow as tf
from tensorflow.python.keras.saving import saving_utils
from tensorflow.python.keras.layers import deserialize, serialize

def unpack(model, training_config, weights):
    restored_model = deserialize(model)
    if training_config is not None:
        restored_model.compile(**saving_utils.compile_args_from_training_config(
        training_config))
    restored_model.set_weights(weights)
    return restored_model


def make_keras_picklable():
    def __reduce__(self):
        model_metadata = saving_utils.model_metadata(self)
        training_config = model_metadata.get("training_config", None)
        model = serialize(self)
        weights = self.get_weights()
        return unpack, (model, training_config, weights)

cls = tf.keras.models.Model
cls.__reduce__ = __reduce__


# Ensures that tf.keras models are pickable
make_keras_picklable()

【讨论】:

  • 我需要哪些导入才能使您的代码运行?我收到以下错误:NameError: name 'reduce' is not defined.
  • 我添加了导入。我希望这些都是你所需要的。我的整个代码中还有更多内容。我试图找出哪些与上面的 sn-p 相关。请让我知道这是否有效
  • 谢谢。我再次尝试了您的 sn-p,但现在又出现了另一个错误:NameError: name 'reduce' is not defined.
猜你喜欢
  • 2022-01-26
  • 2018-05-23
  • 2020-07-14
  • 1970-01-01
  • 2020-09-07
  • 2017-07-16
  • 2018-07-24
  • 1970-01-01
  • 2020-10-30
相关资源
最近更新 更多