【问题标题】:PySpark replace Null with ArrayPySpark 用数组替换 Null
【发布时间】:2017-11-14 01:42:26
【问题描述】:

通过 ID 连接后,我的数据框如下所示:

ID  |  Features  |  Vector
1   | (50,[...]  | Array[1.1,2.3,...]
2   | (50,[...]  | Null

我最终在“向量”列中得到了一些 ID 的 Null 值。我想用一个 300 维的零数组替换这些 Null 值(与非空向量条目的格式相同)。 df.fillna 在这里不起作用,因为它是我要插入的数组。知道如何在 PySpark 中实现这一点吗?

---编辑---

类似于this post我目前的做法:

df_joined = id_feat_vec.join(new_vec_df, "id", how="left_outer")

fill_with_vector = udf(lambda x: x if x is not None else np.zeros(300),
                                 ArrayType(DoubleType()))

df_new = df_joined.withColumn("vector", fill_with_vector("vector"))

遗憾的是收效甚微:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0in stage 848.0 failed 4 times, most recent failure: Lost task 0.3 in stage 848.0 (TID 692199, 10.179.224.107, executor 16): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-193-e55fed27fcd8> in <module>()
      5 a = df_joined.withColumn("vector", fill_with_vector("vector"))
      6 
----> 7 a.show()

/databricks/spark/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
    316         """
    317         if isinstance(truncate, bool) and truncate:
--> 318             print(self._jdf.showString(n, 20))
    319         else:
    320             print(self._jdf.showString(n, int(truncate)))

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

【问题讨论】:

    标签: arrays null pyspark


    【解决方案1】:

    更新:我无法使用 SQL 表达式表单来创建双精度数组。 'array(0.0, ...)' 似乎创建了一个 Decimal 类型的数组。但是,使用 python 函数,你可以让它正确地创建一个双精度数组。

    一般的想法是使用 when/otherwise 函数选择性地只更新您想要的行。您可以提前将所需的文字值定义为列,然后将其转储到“THEN”子句中。

    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    
    schema = StructType([StructField("f1", LongType()), StructField("f2", ArrayType(DoubleType(), False))])
    data = [(1, [10.0, 11.0]), (2, None), (3, None)]
    
    df = sqlContext.createDataFrame(sc.parallelize(data), schema)
    
    # Create a column object storing the value you want in the NULL case
    num_elements = 300
    null_value = array([lit(0.0)] * num_elements)
    
    # If you want a different type you can change it like this
    # null_value = null_value.cast('array<float>')
    
    # Keep the value when there is one, replace it when it's null
    df2 = df.withColumn('f2', when(df['f2'].isNull(), null_value).otherwise(df['f2']))
    

    【讨论】:

    • 谢谢!我认为您的解决方案目前对我不起作用,因为我的数组是 elementType Double: error "cannot resolve 'CASE WHEN (vector IS NULL) THEN 'array(0,0,0)' ELSE vector END' due数据类型不匹配:THEN 和 ELSE 表达式都应该是相同类型或可强制转换为通用类型”。我尝试将您的 array(0, 0, 0) 更改为 array(0.0, 0.0, 0.0) 以匹配数据类型,但错误仍然存​​在。知道 Spark 想要的确切格式吗?
    • 将我的解决方案更新为双打。不像我希望的那样直截了当 b/c 我看不到让 spark 识别表达式中的双重文字的方法,但是使用 python 函数是可行的。
    • 哇,这就像一个魅力!非常感谢,我今天花了 10 个小时试图解决这个问题,几乎绝望了!
    【解决方案2】:

    您可以尝试使用 where 对数据集发出更新请求,将 Vector 列中的每个 NULL 替换为数组。 你在使用 SparkSQL 和数据框吗?

    【讨论】:

    • 是的,我正在使用 SparkSQL 和数据帧。但是,我对它很陌生,并且很难执行操作。
    猜你喜欢
    • 2018-12-02
    • 1970-01-01
    • 2020-09-05
    • 2021-12-10
    • 1970-01-01
    • 1970-01-01
    • 2019-05-21
    • 2016-08-22
    • 2017-07-07
    相关资源
    最近更新 更多