【发布时间】:2019-11-12 10:18:14
【问题描述】:
我正在使用 Databricks,并且在数据框中有一列,我需要通过外部 Web 服务调用为每条记录更新该列。在这种情况下,它使用 Azure 机器学习服务 SDK 并执行服务调用。此代码在未在 spark 中作为 UDF 运行时(即仅 python)运行良好,但是当我尝试将其称为 UDF 时会引发序列化错误。如果我使用 lambda 和带有 rdd 的地图,也会发生同样的情况。
该模型使用 fastText,可以通过普通的 http 调用或使用来自 AMLS 的 WebService SDK 从 Postman 或 python 中正常调用 - 只有当它是 UDF 时,它才会失败并显示以下消息:
TypeError: can't pickle _thread._local objects
我能想到的唯一解决方法是依次循环遍历数据框中的每条记录并通过调用更新记录,但这不是很有效。我不知道这是火花错误还是因为服务正在加载 fasttext 模型。当我使用 UDF 并模拟返回值时,它可以工作。
底部错误...
from azureml.core.webservice import Webservice, AciWebservice
from azureml.core import Workspace
def predictModelValue2(summary, modelName, modelLabel):
raw_data = '[{"label": "' + modelLabel + '", "model": "' + modelName + '", "as_full_account": "' + summary + '"}]'
prediction = service.run(raw_data)
return prediction
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
predictModelValueUDF = udf(predictModelValue2)
DVIRCRAMFItemsDFScored1 = DVIRCRAMFItemsDF.withColumn("Result", predictModelValueUDF("Summary", "ModelName", "ModelLabel"))
TypeError: can't pickle _thread._local objects
在处理上述异常的过程中,又发生了一个异常:
PicklingError Traceback(最近调用 最后)在 ----> 2 x = df.withColumn("结果", predictModelValueUDF("摘要", “模型名称”、“模型标签”))
/databricks/spark/python/pyspark/sql/udf.py 在 wrapper(*args) 第194章 195 def 包装器(* args): --> 196 返回自我(*args) 197 198 包装器。名称 = self._name
/databricks/spark/python/pyspark/sql/udf.py in call(self, *cols) 172 173 def 调用(自我,*cols): --> 174 judf = self._judf 175 sc = SparkContext._active_spark_context 176 返回列(judf.apply(_to_seq(sc,cols,_to_java_column))))
/databricks/spark/python/pyspark/sql/udf.py in _judf(self) 156 # 并且应该具有最小的性能影响。 157 如果 self._judf_placeholder 为无: --> 158 self._judf_placeholder = self._create_judf() 第159章 160
/databricks/spark/python/pyspark/sql/udf.py in _create_judf(self) 第165章 166 --> 167 Wrapped_func = _wrap_function(sc, self.func, self.returnType) 168 jdt = spark._jsparkSession.parseDataType(self.returnType.json()) 169 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
/databricks/spark/python/pyspark/sql/udf.py in _wrap_function(sc, 函数,返回类型) 33 def _wrap_function(sc, func, returnType): 34 命令=(函数,返回类型) ---> 35 pickled_command,broadcast_vars,env,包括=_prepare_for_python_RDD(sc,命令) 36 返回 sc._jvm.PythonFunction(bytearray(pickled_command), 环境, 包括, sc.pythonExec, 37 sc.pythonVer、broadcast_vars、sc._javaAccumulator)
/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command) 2461 # 序列化的命令将被压缩 广播 2462 ser = CloudPickleSerializer() -> 2463 pickled_command = ser.dumps(command) 2464 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # 默认 1M
2465 # 广播将具有与创建相同的生命周期 PythonRDD/databricks/spark/python/pyspark/serializers.py in dumps(self, obj) 709 msg = "无法序列化对象:%s: %s" % (e.class.name, emsg) 第710章 --> 711 引发 pickle.PicklingError(msg) 712 第713章
PicklingError: 无法序列化对象: TypeError: can't pickle _thread._local 对象
【问题讨论】:
-
如果直接调用(无火花),从 predictModelValue2 函数返回的变量预测的类型是什么?如果它不是字符串,那么您可能需要在 udf 声明中相应地提供它。
-
预测是一个浮点数 - 我会明确地尝试一下,谢谢
标签: pyspark user-defined-functions pickle azure-databricks azure-machine-learning-service