【发布时间】:2021-03-29 20:48:15
【问题描述】:
我正在 PySpark 中处理一个 DataFrame,并使用 asyncio 在其上映射两个不同的函数。
假设 DataFrame (x_df) 如下所示:
| speed | volume |
|---|---|
| 26.0 | 234 |
| 32.0 | 123 |
第一个函数,我们称之为a(),当应用于这个DataFrame时,给我:
| speed | volume | model_version |
|---|---|---|
| 26.0 | 234 | v1.0.0 |
| 32.0 | 123 | v1.0.1 |
我将此结果称为a_df。
第二个函数,我们称之为b(),当应用于这个DataFrame时,给我:
| speed | volume | model_type |
|---|---|---|
| 26.0 | 234 | svm |
| 32.0 | 123 | nn |
我将此结果称为b_df。
我想将这2个DataFrame合并为一个,这样我的最终结果是:
| speed | volume | model_type | model_version |
|---|---|---|---|
| 26.0 | 234 | svm | v1.0.0 |
| 32.0 | 123 | nn | v1.0.1 |
我正在使用:
schema_fields = list(a_df.schema.fields) + list(b_df.schema.fields)
schema = StructType(schema_fields)
merged_df = a_df.rdd.zip(b_df.rdd).map(lambda x: x[0] + x[1])
现在,当我检查 merged_df 和 spark.createDataFrame(merged_df, schema).show() 时,我看到:
这些列:
| speed | volume | model_type | speed | volume | model_version |
|---|
如何删除 speed 和 volume 列的重复数据?我避免使用join,因为我的 DataFrame 很大并且有许多公共列(超过 10 个),而且其中一些具有复杂类型,而不仅仅是整数或字符串。
我正在使用 asyncio 来同时运行函数 a() 和 b(),并且不希望更改它,因为它是为了获得一些性能提升而故意的。
【问题讨论】:
-
a()返回数据框并添加列?? -
是的。它进行一些处理并向原始 DF 添加一个新列。
-
为什么不在
a()上致电b()。你有没有通过func a -
因为我使用的是 asyncio,所以 b() 和 a() 都同时运行。使用 asyncio 是为了加快一些简单的处理。
标签: python dataframe apache-spark pyspark