【问题标题】:JSON to Spark RDD in PythonJSON 到 Python 中的 Spark RDD
【发布时间】:2017-05-06 09:11:03
【问题描述】:

我对 Spark 很陌生,我一直在尝试让 Spark 理解我的 JSON 输入,但我一直没有进行管理。总之,我使用 Spark 的 ALS 算法来给出建议。当我提供一个 csv 文件作为输入时,一切正常。但是,我的输入其实是一个json,如下:

all_user_recipe_rating = [{'rating': 1, 'recipe_id': 8798, 'user_id': 2108}, {'rating': 4, 'recipe_id': 6985, 'user_id': 4236}, {'rating': 4, 'recipe_id': 13572, 'user_id': 2743}, {'rating': 4, 'recipe_id': 6312, 'user_id': 3156}, {'rating': 1, 'recipe_id': 12836, 'user_id': 768}, {'rating': 1, 'recipe_id': 9237, 'user_id': 1599}, {'rating': 2, 'recipe_id': 16946, 'user_id': 2687}, {'rating': 2, 'recipe_id': 20728, 'user_id': 58}, {'rating': 4, 'recipe_id': 12921, 'user_id': 2221}, {'rating': 2, 'recipe_id': 10693, 'user_id': 2114}, {'rating': 2, 'recipe_id': 18301, 'user_id': 4898}, {'rating': 2, 'recipe_id': 9967, 'user_id': 3010}, {'rating': 2, 'recipe_id': 16393, 'user_id': 4830}, {'rating': 4, 'recipe_id': 14838, 'user_id': 583}]

ratings_RDD = self.spark.parallelize(all_user_recipe_rating)

ratings = ratings_RDD.map(lambda row:
  (Rating(int(row['user_id']),
   int(row['recipe_id']),
   float(row['rating']))))

model = self.build_model(ratings)

这是我看到一些例子后想出的,但这是我得到的:

MatrixFactorizationModel: User factor is not cached. Prediction could be slow.
16/12/21 03:54:53 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow.
16/12/21 03:54:53 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow.
16/12/21 03:54:53 WARN MatrixFactorizationModelWrapper: User factor does not have a partitioner. Prediction on individual records could be slow.

File "/usr/local/spark/python/pyspark/mllib/recommendation.py", line 147, in <lambda>
user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
TypeError: int() argument must be a string or a number, not 'Rating'

有人可以帮帮我吗? :) 谢谢!

【问题讨论】:

    标签: python json apache-spark pyspark rdd


    【解决方案1】:

    嗯,

    你的错误是由于一件事而发生的。

    你遇到的这个异常是关于ALS function的函数predictAll

    这里的问题是您试图将 Rating 对象发送到需要接收 RDD&lt;int, int&gt; 的函数

    我拿走了你的代码并构建了你需要的东西:

    >>> from pyspark.mllib.recommendation import Rating
    >>> ratings = ratings_RDD.map(lambda row:
    ...   (Rating(int(row['user_id']),
    ...    int(row['recipe_id']),
    ...    float(row['rating']))))
    >>> model = ALS.trainImplicit(ratings, 1, seed=10)
    >>> to_predict = spark.parallelize([[2108, 16393], [583, 20728]])
    >>> model.predictAll(to_predict).take(2)
    [Rating(user=583, product=20728, rating=0.0741161997082127), Rating(user=2108, product=16393, rating=0.05669039815320609)]
    

    您的 JSON 没有错,当您调用 predictAll 时遇到的问题是您发送的是 Rating 对象而不是 RDD&lt;int, int&gt;

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-03
      • 2015-07-14
      • 1970-01-01
      • 2016-01-20
      • 2021-05-26
      • 1970-01-01
      • 1970-01-01
      • 2020-06-03
      相关资源
      最近更新 更多