【问题标题】:How to iterate through Elasticsearch source using Apache Spark?如何使用 Apache Spark 遍历 Elasticsearch 源?
【发布时间】:2019-11-07 18:46:18
【问题描述】:

我正在尝试通过将 Elasticsearch 与 Apache Spark 集成来构建推荐系统。我正在使用 Java。我使用 movilens 数据集作为示例数据。我也将数据索引到 Elasticsearch。到目前为止,我已经能够从 Elasticsearch 索引中读取输入,如下所示:

    SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local");
conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
        conf.set("es.nodes", "localhost");
        conf.set("es.port", "9200");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "movielens/recommendation");

使用 esRDD.collect() 函数,我可以看到我正在正确地从弹性搜索中检索数据。现在我需要将 Elasticsearch 结果中的用户 ID、项目 ID 和偏好提供给 Spark 的推荐。如果我使用的是 csv 文件,我可以这样做:

String path = "resources/user_data.data";
        JavaRDD<String> data = sc.textFile(path);
        JavaRDD<Rating> ratings = data.map(
          new Function<String, Rating>() {
            public Rating call(String s) {
              String[] sarray = s.split("   ");
              return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), 
                                Double.parseDouble(sarray[2]));
            }
          }
        );

如果我需要遍历存储在 esRDD 中的弹性搜索输出并创建与上述类似的映射,那么什么是等效映射?如果有任何我可以参考的示例代码,那将有很大帮助。

【问题讨论】:

    标签: elasticsearch apache-spark


    【解决方案1】:

    很抱歉没有直接回答 Spark 问题,但如果您错过了,这里有使用 elasticsearch 对 MovieLens 数据进行推荐的描述:http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_significant_terms_demo.html

    【讨论】:

    • 感谢您的建议。我之前使用 github.com/codelibs/elasticsearch-taste 插件来使用 Mahout 的项目推荐功能。我最终遇到的问题是每个用户对数据的访问级别不同。一个用户可能无法访问整个数据集,因此,对他们的推荐应该来自他们有权访问的数据集子集。我想不出用这个插件来做这件事的方法。你有这个插件的经验吗?我还没有研究显着术语功能,现在将探索它。
    • 在探索了significant_term功能之后,如果有明确定义的偏好(例如:喜欢或不喜欢电影)似乎是合适的。但对我来说,用户偏好值有很大的不同。用户偏好不是简单地基于用户是否喜欢电影计算的,而是基于各种其他因素计算的。在使用重要术语时是否可以考虑这些因素?
    • 当我在这个数据集上使用显着术语时,我选择用他们喜欢的电影列表(4 星或 5 星评级)为每个用户编制索引。当提供一个电影 ID 并分析所有具有该 ID 的用户时,这似乎产生了很好的推荐。
    • 我还在研究一个“抽样”聚合,它只分析得分最高的用户集以进行查询。这将包裹在显着术语 agg 周围,以防止它分析低质量的匹配。这将允许对选择过程进行更模糊的查询,例如一个拥有一系列电影和增强功能的用户,Lucene 将排名最高质量的 N 个匹配用户,例如那些拥有许多共享电影选择的用户,而这些用户不一定只是每个人都看过的主流电影。
    【解决方案2】:

    您尚未指定 ElasticSearch 中数据的格式。但是我们假设它有字段userIdmovieIdrating,所以示例文档看起来像{"userId":1,"movieId":1,"rating":4}

    那么你应该能够做到(忽略空检查等):

    JavaRDD<Rating> ratings = esRDD.map(
        new Function<Map<String, Object>, Rating>() {
            public Rating call(Map<String, Object> m) {
                  Int userId = Integer.parseInt(m.get("userId"));
                  Int movieId = Integer.parseInt(m.get("movieId"));
                  Double rating = Double.parseDouble(m.get("rating"));
                  return new Rating(userId, movieId, rating);
            }
        }
    );
    

    【讨论】:

    • 感谢您的解决方案。但是数据格式如下:{user_id=26, items=[{item_id=1123, value=0.39811674}, {item_id=445, value=0.13920784}, {item_id=1129, value=0.12835233}},所以不是Map 但 JavaPairRDD>。知道如何解析那个吗?
    • JavaRDD> esRDDVal = esRDD.values();这就是我需要做的所有事情并使用上面描述的迭代。它现在正在工作。谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-26
    • 1970-01-01
    • 2021-05-22
    相关资源
    最近更新 更多