【问题标题】:Using Spark's MapReduce to call a different function and aggregate使用 Spark 的 MapReduce 调用不同的函数并聚合
【发布时间】:2019-06-08 01:27:04
【问题描述】:

我对 spark 非常不熟悉,但我很确定有一种好方法可以比我现在做的更快地完成我想做的事情。

基本上我有一个 S3 存储桶,其中包含大量 JSON 的 Twitter 数据。我想浏览所有这些文件,从 JSON 中获取文本,对文本进行情感分析(目前使用斯坦福 NLP),然后将 Tweet + Sentiment 上传到数据库(现在我正在使用 dynamo,但是这个不是成败)

我目前拥有的代码是

        /**
         * Per thread:
         * 1. Download a file
         * 2. Do sentiment on the file -> output Map<String, List<Float>>
         * 3. Upload to Dynamo: (a) sentiment (b) number of tweets (c) timestamp
         *
         */

        List<String> keys = s3Connection.getKeys();

        ThreadPoolExecutor threads = new ThreadPoolExecutor(40, 40, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
        threads.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        for (String key : keys) {
                threads.submit(new Thread(() -> {
                    try {
                        S3Object s3Object = s3Connection.getObject(key);
                        Map<String, List<Float>> listOfTweetsWithSentiment = tweetSentimentService.getTweetsFromJsonFile(s3Object.getObjectContent());
                        List<AggregatedTweets> aggregatedTweets = tweetSentimentService.createAggregatedTweetsFromMap(listOfTweetsWithSentiment, key);

                        for (AggregatedTweets aggregatedTweet : aggregatedTweets) {
                            System.out.println(aggregatedTweet);
                            tweetDao.putItem(aggregatedTweet);
                        }
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                    }
                }));
        }

这很好用。通过在某些日期范围内运行此代码(即 getKeys 仅获取某些日期范围的密钥)并在不同的 EC2 上运行此过程的许多实例,我能够将这个过程加速到仅大约 2 小时,每个实例都在不同的日期范围。

但是,必须有一个更快的方法来使用一个好的 ole map-reduce 来做到这一点,但我什至不知道如何开始研究这个。是否可以在我的地图中进行情绪分析,然后根据时间戳进行减少?

此外,我正在考虑使用 AWS Glue,但我没有看到在那里使用斯坦福 NLP 库的好方法。

我们将不胜感激。

【问题讨论】:

    标签: java apache-spark mapreduce sentiment-analysis aws-glue


    【解决方案1】:

    是的,您可以使用 Apache Spark 做到这一点。有很多方法可以设计您的应用程序、配置基础架构等。我提出一个简单的设计:

    1. 您在 AWS 上,因此使用 Spark 创建一个 EMR 集群。包含 Zeppelin 以进行交互式调试会很有用。

    2. Spark 使用多种数据抽象。您的朋友是 RDD 和数据集(阅读有关它们的文档)。读取数据到Datasets的代码可能是一样的:

      SparkSession ss = SparkSession.builder().getOrCreate();
      Dataset<Row> dataset = ss.read("s3a://your_bucket/your_path");
      
    3. 现在你有一个Dataset&lt;Row&gt;。这对于类似 SQL 的操作很有用。为了您的分析,您需要将其转换为 Spark RDD:

      JavaRDD<Tweet> analyticRdd = dataset.toJavaRDD().map(row -> {
        return TweetsFactory.tweetFromRow(row);
      });
      
    4. 所以,使用analyticRdd,您可以做您的分析人员。只是不要忘记让所有使用数据的服务都可序列化。

    【讨论】:

      猜你喜欢
      • 2021-08-20
      • 1970-01-01
      • 2016-07-06
      • 2014-08-12
      • 2021-04-07
      • 1970-01-01
      • 2018-01-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多