【问题标题】:update pyspark data frame column based on another column根据另一列更新 pyspark 数据框列
【发布时间】:2018-05-17 21:34:32
【问题描述】:

下面是pyspark 中的一个数据框。我想根据tests 列中的值更新data frame 中的val 列。

df.show()
+---------+----+---+
|    tests| val|asd|
+---------+----+---+
|    test1|   Y|  1|
|    test2|   N|  2|
|    test2|   Y|  1|
|    test1|   N|  2|
|    test1|   N|  3|
|    test3|   N|  4|
|    test4|   Y|  5|
+---------+----+---+

当任何给定的test 具有val Y 时,我想更新该值,那么所有特定测试的val's 都应更新为Y。如果不是,那么他们的价值观是什么。

基本上我希望data frame 如下所示。

result_df.show()

+---------+----+---+
|    tests| val|asd|
+---------+----+---+
|    test1|   Y|  1|
|    test2|   Y|  2|
|    test2|   Y|  1|
|    test1|   Y|  2|
|    test1|   Y|  3|
|    test3|   N|  4|
|    test4|   Y|  5|
+---------+----+---+

我应该怎么做才能做到这一点。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    使用max窗口函数和selectExpr

    df.selectExpr(
        'tests', 'max(val) over (partition by tests) as val', 'asd'
    ).show()
    
    +-----+---+---+
    |tests|val|asd|
    +-----+---+---+
    |test4|  Y|  5|
    |test3|  N|  4|
    |test1|  Y|  1|
    |test1|  Y|  2|
    |test1|  Y|  3|
    |test2|  Y|  2|
    |test2|  Y|  1|
    +-----+---+---+
    

    【讨论】:

    • 我收到以下错误Traceback (most recent call last): File "/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py", line 876, in selectExpr jdf = self._jdf.selectExpr(self._jseq(expr) File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco return f(*a, **kw) py4j.protocol.Py4JJavaError: An error occurred while calling o177.selectExpr. : java.lang.RuntimeException: [1.15] failure: end of input expected max(val) over (partition by tests) as val ^ at scala.sys.package$.error(package.scala:27)
    • 我也从pyspark.sql 导入了Window 函数
    • 此外,我正在使用spark 1.6
    • @user9367133 这应该适合你。等效地,您可以import pyspark.sql.functions as f 并执行df.select('tests', f.max('val').over(Window.partitionBy('tests')).alias('val'), 'asd').show()
    【解决方案2】:

    这里有一个解决方案。 首先我们找出每个测试是否有 val Y。

    import pyspark.sql.functions as sf
    by_test = df.groupBy('tests').agg(sf.sum((sf.col('val') == 'Y').cast('int')).alias('HasY'))
    by_test.show()
    +-----+----+
    |tests|HasY|
    +-----+----+
    |test4|   1|
    |test3|   0|
    |test1|   1|
    |test2|   1|
    +-----+----+
    

    加入原始数据帧

    df = df.join(by_test, on='tests')
    df.show()
    +-----+---+---+----+
    |tests|val|asd|HasY|
    +-----+---+---+----+
    |test4|  Y|  5|   1|
    |test3|  N|  4|   0|
    |test1|  Y|  1|   1|
    |test1|  N|  2|   1|
    |test1|  N|  3|   1|
    |test2|  N|  2|   1|
    |test2|  Y|  1|   1|
    +-----+---+---+----+
    

    使用 when/otherwise 创建具有相同名称的新列

    df = df.withColumn('val', sf.when(sf.col('HasY') > 0, 'Y').otherwise(sf.col('val')))
    df = df.drop('HasY')
    df.show()
    +-----+---+---+
    |tests|val|asd|
    +-----+---+---+
    |test4|  Y|  5|
    |test3|  N|  4|
    |test1|  Y|  1|
    |test1|  Y|  2|
    |test1|  Y|  3|
    |test2|  Y|  2|
    |test2|  Y|  1|
    +-----+---+---+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-11-11
      • 1970-01-01
      • 2018-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-10-14
      相关资源
      最近更新 更多