【问题标题】:Remove duplicate columns from Spark dataframe without using join从 Spark 数据框中删除重复的列而不使用连接
【发布时间】:2021-03-29 20:48:15
【问题描述】:

我正在 PySpark 中处理一个 DataFrame,并使用 asyncio 在其上映射两个不同的函数。

假设 DataFrame (x_df) 如下所示:

speed volume
26.0 234
32.0 123

第一个函数,我们称之为a(),当应用于这个DataFrame时,给我:

speed volume model_version
26.0 234 v1.0.0
32.0 123 v1.0.1

我将此结果称为a_df

第二个函数,我们称之为b(),当应用于这个DataFrame时,给我:

speed volume model_type
26.0 234 svm
32.0 123 nn

我将此结果称为b_df

我想将这2个DataFrame合并为一个,这样我的最终结果是:

speed volume model_type model_version
26.0 234 svm v1.0.0
32.0 123 nn v1.0.1

我正在使用:

schema_fields = list(a_df.schema.fields) + list(b_df.schema.fields)
schema = StructType(schema_fields)
merged_df = a_df.rdd.zip(b_df.rdd).map(lambda x: x[0] + x[1])

现在,当我检查 merged_dfspark.createDataFrame(merged_df, schema).show() 时,我看到: 这些列:

speed volume model_type speed volume model_version

如何删除 speedvolume 列的重复数据?我避免使用join,因为我的 DataFrame 很大并且有许多公共列(超过 10 个),而且其中一些具有复杂类型,而不仅仅是整数或字符串。

我正在使用 asyncio 来同时运行函数 a()b(),并且不希望更改它,因为它是为了获得一些性能提升而故意的。

【问题讨论】:

  • a() 返回数据框并添加列??
  • 是的。它进行一些处理并向原始 DF 添加一个新列。
  • 为什么不在a() 上致电b()。你有没有通过func a
  • 因为我使用的是 asyncio,所以 b() 和 a() 都同时运行。使用 asyncio 是为了加快一些简单的处理。

标签: python dataframe apache-spark pyspark


【解决方案1】:

保持核心列分开,让 a 和 b 只返回新列:

简单的例子:

def a(df): 
    return df.select(
        when(col('speed') % 2 == 0, 'svm').otherwise('nn').alias('model_type')
    )

a(df) 的输出:

+----------+
|model_type|
+----------+
|        nn|
|       svm|
|        nn|
|       svm|
|        nn|
+----------+

简单的 b 示例:

def b(df): 
    return df.select(
        when(col('speed') % 2 == 0, 'v1.0.1').otherwise('v1.0.0').alias('model_version')
    )

b(df) 的输出:

+-------------+
|model_version|
+-------------+
|       v1.0.0|
|       v1.0.1|
|       v1.0.0|
|       v1.0.1|
|       v1.0.0|
+-------------+

简单测试df:

df = spark.createDataFrame([{"speed": 1, "volume": 1}, 
                            {"speed": 2, "volume": 1}, 
                            {"speed": 3, "volume": 1}, 
                            {"speed": 4, "volume": 1}, 
                            {"speed": 5, "volume": 1}])

输出:

+-----+------+
|speed|volume|
+-----+------+
|    1|     1|
|    2|     1|
|    3|     1|
|    4|     1|
|    5|     1|
+-----+------+

压缩和地图功能:

def merge(df1, df2): 
    schema = StructType([*df1.schema, *df2.schema]) 
    return spark.createDataFrame(df1.rdd.zip(df2.rdd).map(lambda x: x[0] + x[1]), schema)

结果

result_df = merge(
    # Original DF with core columns
    df, 
    merge(
        # df with only model_type
        a(df),
        # df with only model_version
        b(df)
    )
)

输出:

+-----+------+----------+-------------+
|speed|volume|model_type|model_version|
+-----+------+----------+-------------+
|    1|     1|        nn|       v1.0.0|
|    2|     1|       svm|       v1.0.1|
|    3|     1|        nn|       v1.0.0|
|    4|     1|       svm|       v1.0.1|
|    5|     1|        nn|       v1.0.0|
+-----+------+----------+-------------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-09-02
    • 2020-08-11
    • 2016-10-01
    • 1970-01-01
    • 2018-04-07
    • 2012-11-13
    • 2012-12-12
    相关资源
    最近更新 更多