【问题标题】:Updating a dataframe column in spark在 Spark 中更新数据框列
【发布时间】:2015-05-20 12:29:39
【问题描述】:

看新的spark DataFrame API,不清楚是否可以修改dataframe列。

我将如何更改数据框的xy 中的值?

pandas 中,这将是:

df.ix[x,y] = new_value

编辑:合并下面所说的内容,您无法修改现有数据框,因为它是不可变的,但您可以返回具有所需修改的新数据框。

如果您只想根据条件替换列中的值,例如np.where

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

如果您想对列执行一些操作并创建一个添加到数据框中的新列:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

如果您希望新列与旧列同名,您可以添加额外的步骤:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')

【问题讨论】:

标签: python dataframe apache-spark pyspark apache-spark-sql


【解决方案1】:

虽然您不能这样修改列,但您可以对列进行操作并返回反映该更改的新 DataFrame。为此,您首先要创建一个 UserDefinedFunction 来实现要应用的操作,然后选择性地将该函数仅应用于目标列。在 Python 中:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df 现在与old_df 具有相同的架构(假设old_df.target_column 也是StringType 类型)但target_column 列中的所有值都将是new_value

【讨论】:

  • 这是问题的实际答案,谢谢!然而,火花工作对我来说还没有完成,所有的执行者都会失败。你能想到另一种方法吗?我将它与更复杂的 UDF 一起使用,在其中我对字符串进行转换。没有类似 pandas 的语法,比如 new_df = old_df.col1.apply(lambda x: func(x))?
  • 还有:new_df = old_df.withColumn('target_column', udf(df.name))
  • 是的,应该可以正常工作。请记住,UDF 只能将列作为参数。如果你想将其他数据传递给函数,你必须先部分应用它。
  • @KatyaHandler 如果您只想复制一列,一种方法是简单地选择它两次:df.select([df[col], df[col].alias('same_column')]),其中col 是您要复制的列的名称.在最新的 Spark 版本中,我使用 UDF 完成的许多工作都可以通过 pyspark.sql.functions 中定义的函数来完成。 Pyspark 中的 UDF 性能真的很差,所以这可能真的值得研究:spark.apache.org/docs/latest/api/python/…
  • 它是StringType 而不是Stringtype in udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
【解决方案2】:

通常在更新列时,我们希望将旧值映射到新值。这是在 pyspark 中不使用 UDF 的一种方法:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).

【讨论】:

  • 当我的 update_col 是一个列表 Ex-=: update_cols=['col1','col2','col3'] 时如何使用它?
  • 使用 for 循环。
【解决方案3】:

DataFrames 基于 RDD。 RDD 是不可变结构,不允许在现场更新元素。要更改值,您需要通过使用类似 SQL 的 DSL 或 RDD 操作(如map)转换原始数据帧来创建一个新的数据帧。

强烈推荐的幻灯片:Introducing DataFrames in Spark for Large Scale Data Science

【讨论】:

  • 添加的数据框抽象到底是什么,这在与表相同的行数中无法完成?
  • " DataFrames 引入了新的简化运算符,用于过滤、聚合和投影大型数据集。在内部,DataFrames 利用 Spark SQL 逻辑优化器智能地规划操作的物理执行,以便在大型数据集上正常工作" - databricks.com/blog/2015/03/13/announcing-spark-1-3.html
【解决方案4】:

正如maasg 所说,您可以根据应用于旧数据帧的映射结果创建一个新数据帧。具有两行的给定 DataFrame df 的示例:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

请注意,如果列的类型发生变化,您需要为其提供正确的架构而不是df.schema。查看org.apache.spark.sql.Row的api获取可用方法:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[更新] 或者在 Scala 中使用 UDF:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

如果列名需要保持不变,您可以将其重命名:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")

【讨论】:

    【解决方案5】:

    pyspark.sql.functions 导入 col, when 并根据字符串(字符串 a,字符串 b)将第五列更新为整数(0,1,2) , 字符串 c) 到一个新的 DataFrame 中。

    from pyspark.sql.functions import col, when 
    
    data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
    

    【讨论】:

      猜你喜欢
      • 2021-10-09
      • 1970-01-01
      • 2021-11-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多