【问题标题】:How to normalize and create similarity matrix in Pyspark?如何在 Pyspark 中标准化和创建相似度矩阵?
【发布时间】:2021-05-27 06:15:44
【问题描述】:

我见过很多关于相似度矩阵的堆栈溢出问题,但它们处理的是 RDD 或其他情况,我找不到问题的直接答案,所以我决定发布一个新问题。

问题

import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import functions as F, Window
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler,Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# pandas dataframe
pdf = pd.DataFrame({'user_id': ['user_0','user_1','user_2'],
                   'apple': [0,1,5],
                   'good banana': [3,0,1],
                   'carrot': [1,2,2]})
# spark dataframe
df = sqlContext.createDataFrame(pdf)
df.show()

+-------+-----+-----------+------+
|user_id|apple|good banana|carrot|
+-------+-----+-----------+------+
| user_0|    0|          3|     1|
| user_1|    1|          0|     2|
| user_2|    5|          1|     2|
+-------+-----+-----------+------+

使用 Pandas 规范化和创建相似度矩阵

from sklearn.preprocessing import normalize

pdf = pdf.set_index('user_id')
item_norm = normalize(pdf,axis=0) # normalize each items (NOT users)
item_sim = item_norm.T.dot(item_norm)
df_item_sim = pd.DataFrame(item_sim,index=pdf.columns,columns=pdf.columns)

                apple  good banana    carrot
apple        1.000000     0.310087  0.784465
good banana  0.310087     1.000000  0.527046
carrot       0.784465     0.527046  1.000000

问题:如何使用 PySpark 得到上述相似度矩阵?

我想对该数据运行 KMeans。

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# I want to do this...
model = KMeans(k=2, seed=1).fit(df.select('norm_features'))

df = model.transform(df)
df.show()

参考文献

【问题讨论】:

    标签: python pandas apache-spark pyspark apache-spark-sql


    【解决方案1】:
    import pyspark.sql.functions as F
    
    df.show()
    +-------+-----+-----------+------+
    |user_id|apple|good banana|carrot|
    +-------+-----+-----------+------+
    | user_0|    0|          3|     1|
    | user_1|    1|          0|     2|
    | user_2|    5|          1|     2|
    +-------+-----+-----------+------+
    

    通过反透视和旋转来交换行和列:

    df2 = df.selectExpr(
        'user_id',
        'stack(3, ' + ', '.join(["'%s', `%s`" % (c, c) for c in df.columns[1:]]) + ') as (fruit, items)'
    ).groupBy('fruit').pivot('user_id').agg(F.first('items'))
    
    df2.show()
    +-----------+------+------+------+
    |      fruit|user_0|user_1|user_2|
    +-----------+------+------+------+
    |      apple|     0|     1|     5|
    |good banana|     3|     0|     1|
    |     carrot|     1|     2|     2|
    +-----------+------+------+------+
    

    标准化:

    df3 = df2.select(
        'fruit',
        *[
            (
                F.col(c) / 
                F.sqrt(
                    sum([F.col(cc)*F.col(cc) for cc in df2.columns[1:]])
                )
            ).alias(c) for c in df2.columns[1:]
        ]
    )
    
    df3.show()
    +-----------+------------------+-------------------+-------------------+
    |      fruit|            user_0|             user_1|             user_2|
    +-----------+------------------+-------------------+-------------------+
    |      apple|               0.0|0.19611613513818404| 0.9805806756909202|
    |good banana|0.9486832980505138|                0.0|0.31622776601683794|
    |     carrot|0.3333333333333333| 0.6666666666666666| 0.6666666666666666|
    +-----------+------------------+-------------------+-------------------+
    

    做矩阵乘法:

    df4 = (df3.alias('t1').repartition(10)
              .crossJoin(df3.alias('t2').repartition(10))
              .groupBy('t1.fruit')
              .pivot('t2.fruit', df.columns[1:])
              .agg(F.first(sum([F.col('t1.'+c) * F.col('t2.'+c) for c in df3.columns[1:]])))
          )
    df4.show()
    +-----------+-------------------+-------------------+------------------+
    |      fruit|              apple|        good banana|            carrot|
    +-----------+-------------------+-------------------+------------------+
    |      apple| 1.0000000000000002|0.31008683647302115|0.7844645405527362|
    |good banana|0.31008683647302115| 0.9999999999999999|0.5270462766947298|
    |     carrot| 0.7844645405527362| 0.5270462766947298|               1.0|
    +-----------+-------------------+-------------------+------------------+
    

    【讨论】:

    • 这是一个伟大而漫长的路要走。我想知道是否有一些向量化的矩阵乘法方法,例如在from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix 和其他地方。
    • 我尝试了类似但无法为我的案例实施:stackoverflow.com/questions/46758768/…
    • @astro123 使用数据帧交叉连接进行矩阵乘法也是并行化的,因为每一行的计算都是并行完成的。我更喜欢使用数据帧 API,因为不推荐使用基于 RDD 的 MLLib,而且由于将数据帧转换为 RDD、矩阵和返回所涉及的开销,我什至不确定是否会有任何性能改进。跨度>
    猜你喜欢
    • 2022-01-12
    • 2011-05-09
    • 2013-08-28
    • 2019-10-30
    • 2021-01-09
    • 2017-06-13
    • 1970-01-01
    • 2011-05-31
    相关资源
    最近更新 更多