【问题标题】:Use find_nearest function on PySpark在 PySpark 上使用 find_nearest 函数
【发布时间】:2022-07-08 01:40:54
【问题描述】:

我在 PySpark 中有一个具有以下架构的数据框:

root
 |-- value: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- variable_name: string (nullable = true)
 |-- Intensity: float (nullable = true)

数据框本身看起来像这样(我将只显示列 valueintensity,因为它们是我想做的唯一需要的) :

value Intensity
[-0.01, 58] 59
[47.2, -20.1] 30

我想做的是:取列“强度”的值,并在“值”。该值将被添加到一个名为“最近” 的新列中。所以,在我的例子中,我会得到:

value Intensity nearest
[-0.01, 58] 59 58
[47.2, -20.1] 30 47.2

为此,我尝试了以下方法:

  • 首先,我定义了我的 find_nearest 函数:
def find_nearest(array, value):
    array = np.array(array)
    nearest_index = np.where(abs(array - value) == abs(array - value).min())[0]
    nearest_value = array[abs(array - value) == abs(array - value).min()]
    return nearest_index[0] ## returns just the index of the nearest value
  • 然后,我尝试在我的数据框中使用我的函数。我试过了:
df2 = df.withColumn("nearest", [find_nearest(a, b) for a, b in zip(df['value'], df['Intensity'])])

但我得到一个错误:

TypeError: 列不可迭代

有人可以帮我解决这个问题吗?

提前谢谢你。

【问题讨论】:

    标签: python dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    你得到的错误意味着你需要定义一个UDF

    但是,在这里您可以简单地使用 Spark 内置函数。这是使用 transformarray_min 与结构排序的一种方法:

    from pyspark.sql import functions as F
    
    df = spark.createDataFrame([([-0.01, 58.0], 59), ([47.2, -20.1], 30)], ["value", "Intensity"])
    
    result = df.withColumn(
        "nearest",
        F.array_min(
            F.expr("transform(value, x -> struct(abs(x - Intensity), x as v))")
        )["v"]
    )
    
    result.show()
    
    # +-------------+---------+-------+
    # |        value|Intensity|nearest|
    # +-------------+---------+-------+
    # |[-0.01, 58.0]|       59|   58.0|
    # |[47.2, -20.1]|       30|   47.2|
    # +-------------+---------+-------+
    

    【讨论】:

      【解决方案2】:

      你可以在不创建自定义函数的情况下做到这一点

      >>> from pyspark.sql import functions as F
      >>> df = spark.createDataFrame( [([-0.01, 58.0],59), ([47.2, -20.1],30)],['value', 'Intensity'])
      >>> df1=df.withColumn("col1",df["value"].getItem(0)).withColumn("col2",df["value"].getItem(1))
      
      >>> df1.withColumn("newcol",when(((df1["Intensity"] - F.abs(df1["col1"]))<(df1["Intensity"] - F.abs(df1["col2"]))),df1["col1"]).otherwise(df1["col2"])).drop(df1["col1"]).drop(df1["col2"]).show()
      +-------------+---------+------+
      |        value|Intensity|newcol|
      +-------------+---------+------+
      |[-0.01, 58.0]|       59|  58.0|
      |[47.2, -20.1]|       30|  47.2|
      +-------------+---------+------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-09-12
        • 1970-01-01
        • 2020-03-27
        • 2019-08-16
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多