您可以考虑以下两种方法:
简单的方法
# Set up the example df
df = spark.createDataFrame([('count',5,10)],['summary','A','B'])
# +-------+---+---+
# |summary| A| B|
# +-------+---+---+
# | count| 5| 10|
# +-------+---+---+
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_row_min(A,B):
return min([A,B])
df.withColumn('new_A', get_row_min(col('A'),col('B')))\
.withColumn('new_B', col('new_A'))\
.drop('A')\
.drop('B')\
.withColumnRenamed('new_A','A')\
.withColumnRenamed('new_B', 'B')\
.show()
# +-------+---+---+
# |summary| A| B|
# +-------+---+---+
# | count| 5| 5|
# +-------+---+---+
间接指定列的通用方法
# Set up df with an extra column (and an extra row to show it works)
df2 = spark.createDataFrame([('count',5,10,15),
('count',3,2,1)],
['summary','A','B','C'])
# +-------+---+---+---+
# |summary| A| B| C|
# +-------+---+---+---+
# | count| 5| 10| 15|
# | count| 3| 2| 1|
# +-------+---+---+---+
@udf(returnType=IntegerType())
def get_row_min_generic(*cols):
return min(cols)
exclude = ['summary']
df3 = df2.withColumn('min_val', get_row_min_generic(*[col(col_name) for col_name in df2.columns
if col_name not in exclude]))
exclude.append('min_val') # this could just be specified in the list
# from the beginning instead of appending
new_cols = [col('min_val').alias(c) for c in df2.columns if c not in exclude]
df_out = df3.select(['summary']+new_cols)
df_out.show()
# +-------+---+---+---+
# |summary| A| B| C|
# +-------+---+---+---+
# | count| 5| 5| 5|
# | count| 1| 1| 1|
# +-------+---+---+---+