【问题标题】:How to save Sparks MatrixFactorizationModel recommendProductsForUsers to Hbase如何将 Sparks MatrixFactorizationModel 推荐ProductsForUsers 保存到 Hbase
【发布时间】:2016-11-25 10:43:35
【问题描述】:

我是 spark 新手,我想将推荐产品ForUsers 的输出保存到 Hbase 表。我找到了一个示例 (https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/) 显示使用 JavaPairRDD 和 saveAsNewAPIHadoopDataset 进行保存。

如何将JavaRDD<Tuple2<Object, Rating[]>> 转换为JavaPairRDD<ImmutableBytesWritable, Put> 以便使用saveAsNewAPIHadoopDataset?

//Loads the data from hdfs
    MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(), trainedDataPath);  

//Get recommendations for all users
    JavaRDD<Tuple2<Object, Rating[]>> ratings3 = sameModel.recommendProductsForUsers(noOfProductsToReturn).toJavaRDD();

【问题讨论】:

  • 您要保存模型或建议?
  • @eliasah 我想保存推荐

标签: apache-spark hbase matrix-factorization


【解决方案1】:

通过使用 mapToPair。来自您提供示例的同一来源(我手动更改了类型):

JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = javaRDD.mapToPair(
  new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() {
@Override
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> row) throws Exception {

   Put put = new Put(Bytes.toBytes(row.getString(0)));
   put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier1"), Bytes.toBytes(row.getString(1)));
   put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier2"), Bytes.toBytes(row.getString(2)));

       return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);     
}
 });

它是这样的,你在构造函数中创建一个新的 put 实例,为它提供行键,然后为你调用的每一列添加。然后你返回创建的 put。

【讨论】:

    【解决方案2】:

    这就是我解决上述问题的方法,希望这对某人有所帮助。

        JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts1 = ratings3
                        .mapToPair(new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() {
    
                            @Override
                            public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> arg0)
                                    throws Exception {
                                Rating[] userAndProducts = arg0._2;
                                System.out.println("***********" + userAndProducts.length + "**************");
                                List<Item> items = new ArrayList<Item>();
                                Put put = null
                                String recommendedProduct = "";                         
                                for (Rating r : userAndProducts) {  
    
    //Some logic here to convert Ratings into appropriate put command
    // recommendedProduct = r.product; 
    
    }
    
                                put.addColumn(Bytes.toBytes("recommendation"), Bytes.toBytes("product"),Bytes.toBytes(recommendedProduct));                     Bytes.toBytes("product"),Bytes.toBytes(response.getItems().toString()));
    
                                return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
                            }
                        });
    
                System.out.println("*********** Number of records in JavaPairRdd: "+ hbasePuts1.count() +"**************");
                hbasePuts1.saveAsNewAPIHadoopDataset(newApiJobConfig.getConfiguration());
                jsc.stop();         
    

    【讨论】:

      【解决方案3】:

      我们刚刚开源了 Splice Machine,并且我们有将 MLIB 与查询和存储集成到 Splice Machine 的示例。我不知道这是否会有所帮助,但我想我会告诉你的。

      http://community.splicemachine.com/use-spark-libraries-splice-machine/

      谢谢你的帖子,太棒了。

      【讨论】:

      • 谢谢 Joh,这很有趣,会调查一下。
      猜你喜欢
      • 1970-01-01
      • 2015-10-07
      • 2015-12-05
      • 2021-12-15
      • 2023-01-07
      • 2018-02-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-17
      相关资源
      最近更新 更多