【问题标题】:Pyspark - erfinv function is not working properlyPyspark - erfinv 功能无法正常工作
【发布时间】:2021-10-24 21:11:26
【问题描述】:

请在下面找到代码:

import pandas as pd
from scipy.stats import norm
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import math
from pyspark.sql.functions import udf
from scipy.special import erfinv


# create sample data
df = spark.createDataFrame([
    (1, 0.008),
    (2, -1.23),
    (3, 4.56),
], ['id', 'value'])

def normal_cdf(x):

    return (math.sqrt(2) * erfinv(x*2-1))
  
my_udf1 = udf(normal_cdf)

df1 = df.withColumn('prob', my_udf1(F.col('value')))

df1.show()

错误:

Py4JJavaError:调用 o420.showString 时出错。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 6.0 中的任务 0 失败 4 次,最近一次失败:丢失任务 0.3 在 6.0 阶段(TID 21,d2-td-cdh.boigroup.net,执行者 17): org.apache.spark.api.python.PythonException:回溯(最近 最后调用):文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", 第 361 行,主要 func,分析器,反序列化器,序列化器 = read_udfs(pickleSer,infile,eval_type)文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", 第 236 行,在 read_udfs 中 arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf) 文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", 第 163 行,在 read_single_udf 中 f、return_type = read_command(pickleSer, infile) 文件“/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip /pyspark/worker.py", 第 64 行,在 read_command 中 command = serializer._read_with_length(file) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark /serializers.py", 第 172 行,在 _read_with_length 返回 self.loads(obj) 文件“/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/序列化程序.py", 第 577 行,负载中 返回 pickle.loads(obj, encoding=encoding) 文件“/usr/local/lib/python3.6/site-packages/numpy/core/init.py”,行 131,在_ufunc_reconstruct mod = import(module, fromlist=[name]) ModuleNotFoundError: No module named 'scipy'

单独执行时效果很好:

n=math.sqrt(2)*erfinv(2*0.010-1)
print(n)

我的系统上有 Scipy。 这里有什么问题?

【问题讨论】:

  • 你是否在所有工作节点上安装了 scipy(不仅在驱动节点上)?
  • 我的 cdsw 上有 Scipy(我猜是工作节点)..如何检查驱动程序节点?
  • 我会尝试 ssh 进入不同的节点。但我不知道这在 cdsw 中是否可行。
  • 除了scipy还有没有提供erfinv功能的库?
  • 你可以尝试直接实现函数link

标签: python apache-spark pyspark


【解决方案1】:

这是因为numpy返回的一些dtypes与spark不兼容,我认为你应该用Null替换np.nan,试试下面:

def normal_cdf(x):
    val = (math.sqrt(2) * erfinv(x*2-1)) 
    return float(val) if pd.notna(val) else None
  
my_udf1 = udf(normal_cdf)
#my_udf1 = udf(normal_cdf,T.DoubleType()) for returning double

df1 = df.withColumn('prob', my_udf1(F.col('value')))

df1.show()

+---+-----+-------------------+
| id|value|               prob|
+---+-----+-------------------+
|  1|0.008|-2.4089155458154616|
|  2|-1.23|               null|
|  3| 4.56|               null|
+---+-----+-------------------+

【讨论】:

  • 我仍然得到同样的错误;你能解释一下为什么它适用于数字而不是数据框吗? n=math.sqrt(2)*erfinv(2*0.008-1) print(n)
猜你喜欢
  • 2017-09-23
  • 2013-11-06
  • 2013-10-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多