【问题标题】:Databricks UDF calling an external web service cannot be serialised (PicklingError)无法序列化调用外部 Web 服务的 Databricks UDF (PicklingError)
【发布时间】:2019-11-12 10:18:14
【问题描述】:

我正在使用 Databricks,并且在数据框中有一列,我需要通过外部 Web 服务调用为每条记录更新该列。在这种情况下,它使用 Azure 机器学习服务 SDK 并执行服务调用。此代码在未在 spark 中作为 UDF 运行时(即仅 python)运行良好,但是当我尝试将其称为 UDF 时会引发序列化错误。如果我使用 lambda 和带有 rdd 的地图,也会发生同样的情况。

该模型使用 fastText,可以通过普通的 http 调用或使用来自 AMLS 的 WebService SDK 从 Postman 或 python 中正常调用 - 只有当它是 UDF 时,它才会失败并显示以下消息:

TypeError: can't pickle _thread._local objects

我能想到的唯一解决方法是依次循环遍历数据框中的每条记录并通过调用更新记录,但这不是很有效。我不知道这是火花错误还是因为服务正在加载 fasttext 模型。当我使用 UDF 并模拟返回值时,它可以工作。

底部错误...

from azureml.core.webservice import Webservice, AciWebservice
from azureml.core import Workspace

def predictModelValue2(summary, modelName, modelLabel):  
    raw_data = '[{"label": "' + modelLabel + '", "model": "' + modelName + '", "as_full_account": "' + summary + '"}]'
    prediction = service.run(raw_data)
    return prediction

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

predictModelValueUDF = udf(predictModelValue2)

DVIRCRAMFItemsDFScored1 = DVIRCRAMFItemsDF.withColumn("Result", predictModelValueUDF("Summary", "ModelName", "ModelLabel"))

TypeError: can't pickle _thread._local objects

在处理上述异常的过程中,又发生了一个异常:

PicklingError Traceback(最近调用 最后)在 ----> 2 x = df.withColumn("结果", predictModelValueUDF("摘要", “模型名称”、“模型标签”))

/databricks/spark/python/pyspark/sql/udf.py 在 wrapper(*args) 第194章 195 def 包装器(* args): --> 196 返回自我(*args) 197 198 包装器。名称 = self._name

/databricks/spark/python/pyspark/sql/udf.py in call(self, *cols) 172 173 def 调用(自我,*cols): --> 174 judf = self._judf 175 sc = SparkContext._active_spark_context 176 返回列(judf.apply(_to_seq(sc,cols,_to_java_column))))

/databricks/spark/python/pyspark/sql/udf.py in _judf(self) 156 # 并且应该具有最小的性能影响。 157 如果 self._judf_placeholder 为无: --> 158 self._judf_placeholder = self._create_judf() 第159章 160

/databricks/spark/python/pyspark/sql/udf.py in _create_judf(self) 第165章 166 --> 167 Wrapped_func = _wrap_function(sc, self.func, self.returnType) 168 jdt = spark._jsparkSession.parseDataType(self.returnType.json()) 169 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(

/databricks/spark/python/pyspark/sql/udf.py in _wrap_function(sc, 函数,返回类型) 33 def _wrap_function(sc, func, returnType): 34 命令=(函数,返回类型) ---> 35 pickled_command,broadcast_vars,env,包括=_prepare_for_python_RDD(sc,命令) 36 返回 sc._jvm.PythonFunction(bytearray(pickled_command), 环境, 包括, sc.pythonExec, 37 sc.pythonVer、broadcast_vars、sc._javaAccumulator)

/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command) 2461 # 序列化的命令将被压缩 广播 2462 ser = CloudPickleSerializer() -> 2463 pickled_command = ser.dumps(command) 2464 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # 默认 1M
2465 # 广播将具有与创建相同的生命周期 PythonRDD

/databricks/spark/python/pyspark/serializers.py in dumps(self, obj) 709 msg = "无法序列化对象:%s: %s" % (e.class.name, emsg) 第710章 --> 711 引发 pickle.PicklingError(msg) 712 第713章

PicklingError: 无法序列化对象: TypeError: can't pickle _thread._local 对象

【问题讨论】:

  • 如果直接调用(无火花),从 predictModelValue2 函数返回的变量预测的类型是什么?如果它不是字符串,那么您可能需要在 udf 声明中相应地提供它。
  • 预测是一个浮点数 - 我会明确地尝试一下,谢谢

标签: pyspark user-defined-functions pickle azure-databricks azure-machine-learning-service


【解决方案1】:

我不是 DataBricks 或 Spark 方面的专家,但是当您接触像 service 对象这样的复杂对象时,从本地笔记本上下文中提取函数总是有问题的。在这种特殊情况下,我建议删除对 azureML service 对象的依赖,只使用 requests 调用服务。

从服务中拉取密钥:

# retrieve the API keys. two keys were generated.
key1, key2 = service.get_keys()
scoring_uri = service.scoring_uri

您应该能够直接在 UDF 中使用这些字符串,而不会出现酸洗问题——here is an example 您将如何仅通过请求调用服务。以下适用于您的 UDF:

import requests, json
def predictModelValue2(summary, modelName, modelLabel):  
  input_data = json.dumps({"summary": summary, "modelName":, ....})

  headers = {'Content-Type':'application/json', 'Authorization': 'Bearer ' + key1}

  # call the service for scoring
  resp = requests.post(scoring_uri, input_data, headers=headers)

  return resp.text[1]

不过,在一个侧节点上:您的 UDF 将为数据框中的每一行调用,并且每次它都会进行网络调用——这将非常慢。我建议寻找批量执行的方法。正如您从构造的 json 中看到的那样,service.run 将接受一组项目,因此您应该以 100 左右的批次调用它。

【讨论】:

  • 谢谢,这与我的想法一致——切换到请求库而不是使用 SDK...但是我没有想到使用 SDK 来获取密钥和 URL - 这是个好主意。
  • 是的,我同意,这并不理想!我想我可能需要改变我的方法并切换到 azure 函数或异步运行它们。啊,我也不知道我可以将它们作为批次传递——这是另一个好主意,ta。
猜你喜欢
  • 1970-01-01
  • 2010-10-14
  • 2021-03-20
  • 2014-12-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多