【问题标题】:How to use the implementation of Spark ML ALS that supports generic ID types (int and Long)如何使用支持通用 ID 类型(int 和 Long)的 Spark ML ALS 的实现
【发布时间】:2017-04-06 16:44:59
【问题描述】:

我正在尝试使用 Spark ML ALS 构建推荐系统,其中数据如下

"User-ID";"ISBN "; "Book-Rating"
276725;034545104;0
276726;0155061224;5
276727;0446520802;0
276729;052165615;3
276729;0521795028;6

我正在使用 Spark 2.1.0 和 mongoldb 来加载数据。这是我的一段代码,它在转换后定义了数据框和他的 shema。

/*
 *  Chargement de données de rating
 */

val dfrating = spark.loadFromMongoDB(readConfig) 

val bookRatings = dfrating.selectExpr("cast(User_ID as Long) User_ID " ,"cast(ISBN as Long) ISBN ", "Book_Rating")

bookRatings.printSchema()

val als = new ALS().setMaxIter(10).setRegParam(0.01).setUserCol("User_ID").setItemCol("ISBN").setRatingCol("Book_Rating")
val model = als.fit(training)

              ———- After compiling, I have got ——
root
 |-- User_ID: long (nullable = true)
 |-- ISBN: long (nullable = true)
 |-- Book_Rating: integer (nullable = true)

+-------+----------+-----------+
|User_ID|      ISBN|Book_Rating|
+-------+----------+-----------+
|    215|  61030147|          6|
|   5750|1853260045|          0|
|  11676| 743244249|          0|
|  11676|1551665700|          0|

原因:java.lang.IllegalArgumentException:ALS **仅支持列的 User_ID 和 ISBN 的整数范围内的值。 ****值** 8.477024456E9 **超出整数范围。****** 在 org.apache.spark.ml.recommendation.ALSModelParams$$anonfun$1.apply$mcID$sp(ALS.scala:87)

                   —————————————————————

还有其他解决方案可以让事情正常运行吗?对于同样的问题,我有这个建议(How to use mllib.recommendation if the user ids are string instead of contiguous integers?How to use long user ID in PySpark ALSNon-integer ids in Spark MLlib ALS),但我不知道如何开始。任何帮助!提前致谢。


@GPI 感谢您的建议。这就是我所做的。

val isbn_als = new StringIndexer()
      .setHandleInvalid("skip")
      .setInputCol("ISBN")
      .setOutputCol("ISBN_als")
      .fit(uRatings)

val isbn_als_reverse = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predictedLabel")

val als = new    ALS().setMaxIter(10).setRegParam(0.01).setUserCol("User_ID").setItemCol("ISBN_als").setRatingCol("Book_Rating")

     /*
      *  On définit l'ordre des opérations à effectuer
      */

     println("On passe au Pipeline")

     val alsPipeline = new Pipeline().setStages(Array(isbn_als, als, isbn_als_reverse))

     /*
      *  On construit le modèle de recommandation à partir des données de Training
      */

     println("On passe à la construction du modèle")

     val alsModel = alsPipeline.fit(training)


     /*
      *  On exécute le modèle sur les données de Test, puis on affiche un échantillon de prédictions
      */

     println("On exécute le modèle sur les données de Test")


     val alsPredictions = alsModel.transform(test).na.drop()


     println("Affichage des prédictions")

     alsPredictions.select($"User_ID",$"ISBN", $"Book_Rating", $"prediction").show(20)

但是当我在管道上使用 IndexToString() 时出现此异常。

On passe au Pipeline
On passe à la construction du modèle
On exécute le modèle sur les données de Test
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to org.apache.spark.ml.*attribute.NominalAttribute*
    at org.apache.spark.ml.feature.IndexToString.transform(StringIndexer.scala:313)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)

———————————————————————————————————————

当我不使用 IndexToString() 时,我得到了一个否定的预测。

+-------+---------+-----------+-------------+
|User_ID|     ISBN|Book_Rating|   prediction|
+-------+---------+-----------+-------------+
| 140340|786881852|         10|    6.9798374|
| 127327|786881852|          0|-1.2718141E-4|
| 103336|786881852|          0|    1.2374072|
| 138578|786881852|          9|     8.200257|
| 172742|786881852|          0|   -1.3278971|
|  31909|786881852|          6|     5.997123|
|  69554|786881852|          5|     2.819587|
| 173650|786881852|          0|   0.42850634|

—————————————————————————————————

我想负面预测是由于未使用的 IndexToString() 造成的。如果是这样,如何在管道上使用 IndexToString() ?提前致谢

【问题讨论】:

    标签: apache-spark apache-spark-mllib


    【解决方案1】:

    您得到的异常是由配置错误的 IndexToString 部分发出的。您让它将预测解码回字符串,但预测不是产品 (ISBN),而是评级:ALS 预测的是评级,而不是产品。

    这反过来意味着您不需要逆变器。

    请参阅以下工作示例:

    scala> import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.Pipeline
    
    scala> import org.apache.spark.ml.recommendation._
    import org.apache.spark.ml.recommendation._
    
    scala> import org.apache.spark.ml.feature._
    import org.apache.spark.ml.feature._
    
    // This is just a helper
    scala> case class Rating(user: Long, isbn: String, rating: Double)
    defined class Rating
    
    // Let's create 2 books, 3 users, 3 ratings to train the model    
    scala> val rawRatings = Seq(Rating(1, "1234567890123", 1), Rating(2, "12345678901234", 2), Rating(3, "12345678901234", 3))
    rawRatings: Seq[Rating] = List(Rating(1,1234567890123,1.0), Rating(2,12345678901234,2.0), Rating(3,12345678901234,3.0))
    
    scala> val ratings = spark.createDataFrame(rawRatings)
    
    scala> val isbn_als = new StringIndexer().setInputCol("isbn").setOutputCol("isbnIDX")
    isbn_als: org.apache.spark.ml.feature.StringIndexer = strIdx_53d752f20587
    
    scala> val als = new ALS().setUserCol("user").setItemCol("isbnIDX").setRatingCol("rating")
    als: org.apache.spark.ml.recommendation.ALS = als_41eff9ae835d
    
    scala> val stages = Array(isbn_als, als)
    stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(strIdx_53d752f20587, als_41eff9ae835d, idxToStr_9b2ca994074f)
    
    // Do the actual training
    scala> val pipeline = new Pipeline().setStages(stages)
    pipeline: org.apache.spark.ml.Pipeline = pipeline_5f05891139b6
    
    scala> val pipeModel = pipeline.fit(ratings)
    pipeModel: org.apache.spark.ml.PipelineModel = pipeline_5f05891139b6
    
    // And make predictions for any user/book combination
    scala> case class UserBook(user: Long, isbn: String)
    defined class UserBook
    
    scala> val testSet = Seq(UserBook(1, "12345678901234"))
    testSet: Seq[UserBook] = List(UserBook(1,12345678901234))
    
    scala> val testDF = spark.createDataFrame(testSet)
    testDF: org.apache.spark.sql.DataFrame = [user: bigint, isbn: string]
    
    scala> pipeModel.transform(testDF).show
    +----+--------------+-------+----------+
    |user|          isbn|isbnIDX|prediction|
    +----+--------------+-------+----------+
    |   1|12345678901234|    0.0| 0.7389956|
    +----+--------------+-------+----------+
    

    这里,“预测”是对用户 1 对图书 ISBN 12345678901234 的预测进行评级。isbnIDX 仅用于计算目的,无需反转,因为我们已经在数据框中拥有 isbn。

    【讨论】:

    • GPI,它修复了我的审讯。感谢您的宝贵时间。
    • @MAA-BIK 你仍然不接受这个答案(或投票)。
    猜你喜欢
    • 1970-01-01
    • 2017-01-19
    • 2022-08-24
    • 1970-01-01
    • 2014-12-05
    • 1970-01-01
    • 2019-05-17
    • 2017-02-11
    • 2017-12-08
    相关资源
    最近更新 更多