AFAIK,如果不使用udf,就无法直接检查一列是否包含在另一列中或是否是另一列的子字符串。
但是,如果您想避免使用udf,一种方法是分解"Updated" 列。然后,您可以检查 "Serial" 列和展开的 "Updated" 列之间的相等性并应用您的条件(如果匹配则为 1,否则为 2)- 调用此 "contains"。
最后,您可以groupBy("ID", "Serial", "Updated") 并选择"contains" 列的最小值。
例如,在两次调用explode() 并检查您的情况后,您将拥有这样的DataFrame:
df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
.withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
.withColumn(
"contains",
f.when(
f.isnull("Serial") |
f.isnull("Updated") |
(f.col("Serial") == "") |
(f.col("Updated") == ""),
0
).when(
f.col("Serial") == f.col("updatedExploded"),
1
).otherwise(2)
)\
.show(truncate=False)
#+---+--------+-----------------+---------------+--------+
#|ID |Serial |Updated |updatedExploded|contains|
#+---+--------+-----------------+---------------+--------+
#|10 |pers1 | | |0 |
#|20 | | | |0 |
#|30 |entity_1|entity_1,entity_3|entity_1 |1 |
#|30 |entity_1|entity_1,entity_3|entity_3 |2 |
#|30 |entity_2|entity_1,entity_3|entity_1 |2 |
#|30 |entity_2|entity_1,entity_3|entity_3 |2 |
#|30 |entity_3|entity_1,entity_3|entity_1 |2 |
#|30 |entity_3|entity_1,entity_3|entity_3 |1 |
#+---+--------+-----------------+---------------+--------+
按("ID", "Serial", "Updated") 分组并取"contains" 的最小值的“技巧”之所以有效,是因为:
- 如果
"Serial" 或"Updated" 为null(或在这种情况下等于空字符串),则值为0。
- 如果
"Updated" 中的至少一个值与"Serial" 匹配,则其中一列的值为1。
- 如果没有匹配项,您将只有 2 个
最终输出:
df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
.withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
.withColumn(
"contains",
f.when(
f.isnull("Serial") |
f.isnull("Updated") |
(f.col("Serial") == "") |
(f.col("Updated") == ""),
0
).when(
f.col("Serial") == f.col("updatedExploded"),
1
).otherwise(2)
)\
.groupBy("ID", "Serial", "Updated")\
.agg(f.min("contains").alias("contains"))\
.sort("ID")\
.show(truncate=False)
#+---+--------+-----------------+--------+
#|ID |Serial |Updated |contains|
#+---+--------+-----------------+--------+
#|10 |pers1 | |0 |
#|20 | | |0 |
#|30 |entity_3|entity_1,entity_3|1 |
#|30 |entity_2|entity_1,entity_3|2 |
#|30 |entity_1|entity_1,entity_3|1 |
#+---+--------+-----------------+--------+
我是chaining calls 到pyspark.sql.functions.when() 来检查条件。第一部分检查任一列是否为null 或等于空字符串。我相信您可能只需要在实际数据中检查null,但我会根据您显示示例DataFrame的方式检查空字符串。