【发布时间】:2021-12-19 22:26:18
【问题描述】:
假设我有一个这样的数据框。我想从另一个数据框创建一个新列。 第一个数据框:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","34563","M",3000),
("Michael","Rose","","52452","M",4000),
("Robert","","Williams","72331","M",4000),
("Maria","Anne","Jones","52334","F",4000),
("Jen","Mary","Brown","82311","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
root
|-- firstname: string (nullable = true)
|-- middlename: string (nullable = true)
|-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id |gender|salary|
+---------+----------+--------+-----+------+------+
|James | |Smith |34563|M |3000 |
|Michael |Rose | |52452|M |4000 |
|Robert | |Williams|72331|M |4000 |
|Maria |Anne |Jones |52334|F |4000 |
|Jen |Mary |Brown |82311|F |-1 |
+---------+----------+--------+-----+------+------+
第二个数据框:
df_2 = spark.createDataFrame([(34563, 435353424, 1, 2 ), (23524, 466344656, 2, 1), (52452, 263637236, 2, 5), (
52334, 466633353, 2, 3), (66334, 563555578, 5, 4), (42552, 123445563, 5, 3), (72331, 413555213, 4, 3), (82311, 52355563, 2, 2)], ["id", "col_A", "val_1", "val_2"])
df_2.show()
+-----+---------+-----+-----+
| id| col_A|val_1|val_2|
+-----+---------+-----+-----+
|34563|435353424| 1| 2|
|23524|466344656| 2| 1|
|52452|263637236| 2| 5|
|52334|466633353| 2| 3|
|66334|563555578| 5| 4|
|42552|123445563| 5| 3|
|72331|413555213| 4| 3|
|82311| 52355563| 2| 2|
+-----+---------+-----+-----+
我想使用第二个数据框中的列在第一个数据框中创建一个新列(理论责任 3)。这是我的代码:
merge_imputation=df.join(df_2,\
df["id"]==df_2["id"]\
,how="left").dropDuplicates(["id"])
df=df.withColumn("Theoretical Accountable 3",F.when((F.col("gender")=="M"),F.lit("1")).\
when((F.col("gender")=="F"),F.lit("2")).\
when(F.col("salary")>2000,merge_imputation.select("col_A"))
.otherwise(F.col("lastname")))
如何在没有错误消息的情况下使用连接列?我的问题我不知道在 when 条件下使用 column merge_imputation.select("col_A")。
【问题讨论】: