【发布时间】:2018-06-03 23:33:46
【问题描述】:
我在 python 中通过 tensorflow 训练了一个 DNN 分类器模型。现在我想将它加载到 pyspark 并使用该模型来预测 RDD 的每条记录的性别。首先我像在训练模型中一样构建张量流图,然后加载训练好的模型并尝试预测 RDD 的每一行:
"""
code to generate the tensorflow graph omitted
"""
with tf.Session(graph=graph) as sess:
# load the trained model
saver.restore(sess, "./nonClass_gender")
# lib is the RDD, each Row has the form of Row(key = ..., values = ..., indcies =..., shape = ...)
predictions_1 = lib.map(lambda e: Row(key = e["key"],
prob = y_proba.eval(feed_dict={values: e["values"],
indices: e["indices"], shape: [1,2318]})))
predictions_1.take(5)
请注意,在 RDD 中,每一行的形式为 Row(key = ..., values = ..., indcies =..., shape = ...)。值、索引和形状等效于此答案中的值、索引和dense_shape: Use coo_matrix in TensorFlow。它们用于生成 SparseTensorValue。不同之处在于,在我的代码中,每一行都会生成一个 SparseTensorValue。
然后我有以下错误:
Traceback (most recent call last):
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 148, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 249, in save_function
self.save_function_tuple(obj)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 297, in save_function_tuple
save(f_globals)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 249, in save_function
self.save_function_tuple(obj)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 368, in save_builtin_function
return self.save_function(obj)
File "/usr/local/spark/python/pyspark/cloudpickle.py", line 247, in save_function
if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
-------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-210-74fa9037373f> in <module>()
6 prob = y_proba.eval(feed_dict={values: e["values"],
7 indices: e["indices"], shape: [1,2318]})))
----> 8 predictions_1.take(5)
/usr/local/spark/python/pyspark/rdd.pyc in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
/usr/local/spark/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
990 # SparkContext#runJob.
991 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
994
/usr/local/spark/python/pyspark/rdd.pyc in _jrdd(self)
2453
2454 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2455 self._jrdd_deserializer, profiler)
2456 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2457 self.preservesPartitioning)
/usr/local/spark/python/pyspark/rdd.pyc in _wrap_function(sc, func, deserializer, serializer, profiler)
2386 assert serializer, "serializer should not be empty"
2387 command = (func, profiler, deserializer, serializer)
-> 2388 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2389 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2390 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/usr/local/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command)
2372 # the serialized command will be compressed by broadcast
2373 ser = CloudPickleSerializer()
-> 2374 pickled_command = ser.dumps(command)
2375 if len(pickled_command) > (1 << 20): # 1M
2376 # The broadcast will have same life cycle as created PythonRDD
/usr/local/spark/python/pyspark/serializers.pyc in dumps(self, obj)
458
459 def dumps(self, obj):
--> 460 return cloudpickle.dumps(obj, 2)
461
462
/usr/local/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
702
703 cp = CloudPickler(file,protocol)
--> 704 cp.dump(obj)
705
706 return file.getvalue()
/usr/local/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
160 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
161 print_exec(sys.stderr)
--> 162 raise pickle.PicklingError(msg)
163
164 def save_memoryview(self, obj):
PicklingError: Could not serialize object: AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
在上面的代码中,如果我将prob = y_proba.eval(feed_dict={values: e["values"], indices: e["indices"], shape: [1,2318]}))) 更改为python 定义的函数,如proba = test(e["values"],e["indices"], [1,2318]),它将起作用。此外,如果我只在 python 中使用y_proba.eval(不在 RDD 映射中),它也可以工作。
【问题讨论】:
-
你试过用 run 代替 eval 吗?
-
run 会给出同样的错误。不过,我刚刚找到了解决办法,请看下面我的回答。
标签: python apache-spark tensorflow pyspark deep-learning