【问题标题】:Apply MinMaxScaler on multiple columns in PySpark在 PySpark 中的多个列上应用 MinMaxScaler
【发布时间】:2020-02-18 12:52:56
【问题描述】:

我想将 PySpark 的 MinMaxScalar 应用于 PySpark 数据框 df 的多列。到目前为止,我只知道如何将它应用于单个列,例如x.

from pyspark.ml.feature import MinMaxScaler

pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)

scaler = MinMaxScaler(inputCol="x", outputCol="x")
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)

如果我有 100 列怎么办?有什么方法可以对 PySpark 中的许多列进行最小-最大缩放?

更新:

另外,如何将MinMaxScalar 应用于整数或双精度值?它抛出以下错误:

java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int.

【问题讨论】:

    标签: python pyspark apache-spark-sql


    【解决方案1】:

    问题一:

    如何更改您的示例以正常运行。您需要准备数据作为转换器工作的向量。

    from pyspark.ml.feature import MinMaxScaler
    from pyspark.ml import Pipeline
    from pyspark.ml.linalg import VectorAssembler
    
    pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
    df = spark.createDataFrame(pdf)
    
    assembler = VectorAssembler(inputCols=["x"], outputCol="x_vec")
    scaler = MinMaxScaler(inputCol="x_vec", outputCol="x_scaled")
    pipeline = Pipeline(stages=[assembler, scaler])
    scalerModel = pipeline.fit(df)
    scaledData = scalerModel.transform(df)
    

    问题 2:

    要在多个列上运行 MinMaxScaler,您可以使用一个管道来接收使用列表理解准备的转换列表:

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import MinMaxScaler
    columns_to_scale = ["x", "y", "z"]
    assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
    scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
    pipeline = Pipeline(stages=assemblers + scalers)
    scalerModel = pipeline.fit(df)
    scaledData = scalerModel.transform(df)
    

    查看官方文档中的this example pipeline

    最终,您将得到以下格式的结果:

    >>> scaledData.printSchema() 
    root
     |-- x: long (nullable = true)
     |-- y: long (nullable = true)
     |-- z: long (nullable = true)
     |-- x_vec: vector (nullable = true)
     |-- y_vec: vector (nullable = true)
     |-- z_vec: vector (nullable = true)
     |-- x_scaled: vector (nullable = true)
     |-- y_scaled: vector (nullable = true)
     |-- z_scaled: vector (nullable = true)
    
    >>> scaledData.show()
    +---+---+----+-----+-----+--------+--------+--------+--------------------+
    |  x|  y|   z|x_vec|y_vec|   z_vec|x_scaled|y_scaled|            z_scaled|
    +---+---+----+-----+-----+--------+--------+--------+--------------------+
    |  0|  1| 100|[0.0]|[1.0]| [100.0]|   [0.0]|   [0.0]|               [0.0]|
    |  1|  2| 200|[1.0]|[2.0]| [200.0]|   [0.5]|  [0.25]|[0.1111111111111111]|
    |  2|  5|1000|[2.0]|[5.0]|[1000.0]|   [1.0]|   [1.0]|               [1.0]|
    +---+---+----+-----+-----+--------+--------+--------+--------------------+
    

    额外的后处理:

    您可以通过一些后处理恢复原始名称中的列。例如:

    from pyspark.sql import functions as f
    names = {x + "_scaled": x for x in columns_to_scale}
    scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])
    

    输出将是:

    scaledData.show()
    +------+-----+--------------------+
    |     y|    x|                   z|
    +------+-----+--------------------+
    | [0.0]|[0.0]|               [0.0]|
    |[0.25]|[0.5]|[0.1111111111111111]|
    | [1.0]|[1.0]|               [1.0]|
    +------+-----+--------------------+
    

    【讨论】:

    • 谢谢!我测试了你的解决方案。我收到一个错误:java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int. 能否请您告诉我如何解决?
    • 我执行了printSchema()。所有列都是数字:整数或双精度,例如col1: integer (nullable = true)
    • 看来需要将数据编码成Vectors.dense。你能用Vectors.dense展示一个完整的例子吗?
    • 如何在最终结果中从[ ] 中获取值?
    • @Fluxy 你可以在这个答案中使用 udf stackoverflow.com/a/44505571/1762211
    【解决方案2】:

    您可以将单个 MinMaxScaler 实例用于“向量组合”的一组功能,而不是为要转换的每列创建一个 MinMaxScaler(在这种情况下为缩放)。

    from pyspark.ml.feature import MinMaxScaler
    from pyspark.ml.feature import VectorAssembler
    
    #1. Your original dataset
    #pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
    #df = spark.createDataFrame(pdf)
    
    df = spark.createDataFrame([(0, 10.0, 0.1), (1, 1.0, 0.20), (2, 1.0, 0.9)],["x", "y", "z"])
    
    df.show()
    +---+----+---+
    |  x|   y|  z|
    +---+----+---+
    |  0|10.0|0.1|
    |  1| 1.0|0.2|
    |  2| 1.0|0.9|
    +---+----+---+
    
    #2. Vector assembled set of features 
    # (assemble only the columns you want to MinMax Scale)
    assembler = VectorAssembler(inputCols=["x", "y", "z"], 
    outputCol="features")
    output = assembler.transform(df)
    
    output.show()
    
    +---+----+---+--------------+
    |  x|   y|  z|      features|
    +---+----+---+--------------+
    |  0|10.0|0.1|[0.0,10.0,0.1]|
    |  1| 1.0|0.2| [1.0,1.0,0.2]|
    |  2| 1.0|0.9| [2.0,1.0,0.9]|
    +---+----+---+--------------+
    
    #3. Applying MinMaxScaler to your assembled features 
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    # rescale each feature to range [min, max].
    scaledData = scaler.fit(output).transform(output)
    scaledData.show()
    
    +---+----+---+--------------+---------------+
    |  x|   y|  z|      features| scaledFeatures|
    +---+----+---+--------------+---------------+
    |  0|10.0|0.1|[0.0,10.0,0.1]|  [0.0,1.0,0.0]|
    |  1| 1.0|0.2| [1.0,1.0,0.2]|[0.5,0.0,0.125]|
    |  2| 1.0|0.9| [2.0,1.0,0.9]|  [1.0,0.0,1.0]|
    +---+----+---+--------------+---------------+
    

    希望这会有所帮助。

    【讨论】:

    • 请将您的代码发布为 text,因为代码是 text。它还可以让人们更轻松地将其复制粘贴到他们自己的编辑器中并尝试您的解决方案。
    • 不要使用代码截图...特别是黑色主题的白色背景,这让我大吃一惊...
    • 知道了,修好了。谢谢。
    猜你喜欢
    • 2019-01-09
    • 1970-01-01
    • 2016-08-24
    • 1970-01-01
    • 1970-01-01
    • 2021-05-25
    • 1970-01-01
    • 1970-01-01
    • 2021-06-22
    相关资源
    最近更新 更多