【发布时间】:2022-07-08 01:40:54
【问题描述】:
我在 PySpark 中有一个具有以下架构的数据框:
root
|-- value: array (nullable = true)
| |-- element: double (containsNull = true)
|-- id: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- variable_name: string (nullable = true)
|-- Intensity: float (nullable = true)
数据框本身看起来像这样(我将只显示列 value 和 intensity,因为它们是我想做的唯一需要的) :
| value | Intensity |
|---|---|
| [-0.01, 58] | 59 |
| [47.2, -20.1] | 30 |
我想做的是:取列“强度”的值,并在“值”。该值将被添加到一个名为“最近” 的新列中。所以,在我的例子中,我会得到:
| value | Intensity | nearest |
|---|---|---|
| [-0.01, 58] | 59 | 58 |
| [47.2, -20.1] | 30 | 47.2 |
为此,我尝试了以下方法:
- 首先,我定义了我的 find_nearest 函数:
def find_nearest(array, value):
array = np.array(array)
nearest_index = np.where(abs(array - value) == abs(array - value).min())[0]
nearest_value = array[abs(array - value) == abs(array - value).min()]
return nearest_index[0] ## returns just the index of the nearest value
- 然后,我尝试在我的数据框中使用我的函数。我试过了:
df2 = df.withColumn("nearest", [find_nearest(a, b) for a, b in zip(df['value'], df['Intensity'])])
但我得到一个错误:
TypeError: 列不可迭代
有人可以帮我解决这个问题吗?
提前谢谢你。
【问题讨论】:
标签: python dataframe apache-spark pyspark apache-spark-sql