【问题标题】:Whats the Efficient way to call http request and read inputstream in spark MapTask在 Spark MapTask 中调用 http 请求和读取输入流的有效方法是什么
【发布时间】:2017-05-23 11:37:46
【问题描述】:

请看下面的代码示例

JavaRDD<String> mapRDD = filteredRecords
            .map(new Function<String, String>() {

                @Override
                public String call(String url) throws Exception {
                    BufferedReader in = null;
                    URL formatURL = new URL((url.replaceAll("\"", ""))
                            .trim());
                    try {
                        HttpURLConnection con = (HttpURLConnection) formatURL
                                .openConnection();
                        in = new BufferedReader(new InputStreamReader(con
                                .getInputStream()));

                        return in.readLine();
                    } finally {
                        if (in != null) {
                            in.close();
                        }
                    }
                }
            }); 

这里的 url 是 http GET 请求。例子

http://ip:port/cyb/test?event=movie&id=604568837&name=SID&timestamp_secs=1460494800&timestamp_millis=1461729600000&back_up_id=676700166

这段代码很慢。 IP 和端口是随机的,负载是分布的,所以 ip 可以有 20 个不同的端口值,所以我看不到瓶颈。

当我评论时

 in = new BufferedReader(new InputStreamReader(con
                            .getInputStream()));

                    return in.readLine();

代码太快了。 注意:要处理的输入数据为 10GB。使用 spark 从 S3 读取数据。

我用 BufferedReader 或 InputStreamReader any alternative 有什么问题吗? 我不能在 spark 中使用 foreach,因为我必须从服务器获取响应并且需要将 JAVARdd 保存为 HDFS 上的 textFile。

如果我们使用 mappartition 代码如下

JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

        @Override
        public Iterable<String> call(Iterator<String> tuple) throws Exception {

            final List<String> rddList = new ArrayList<String>();
            Iterable<String> iterable = new Iterable<String>() {

                @Override
                public Iterator<String> iterator() {
                    return rddList.iterator();
                }
            };
            while(tuple.hasNext()) {
                URL formatURL = new URL((tuple.next().replaceAll("\"", ""))
                        .trim());
                HttpURLConnection con = (HttpURLConnection) formatURL
                        .openConnection();
                try(BufferedReader br = new BufferedReader(new InputStreamReader(con
                        .getInputStream()))) {

                    rddList.add(br.readLine());

                } catch (IOException ex) {
                    return rddList;
                }
            }
            return iterable;
        }
    }); 

对于我们正在做的每条记录也是如此.. 不是吗?

【问题讨论】:

    标签: amazon-web-services apache-spark amazon-s3 java-7


    【解决方案1】:

    目前你正在使用

    地图功能

    为分区中的每一行创建一个 url 请求。

    你可以使用

    地图分区

    这将使代码运行得更快,因为它只创建一次与服务器的连接,即每个分区只有一个连接。

    【讨论】:

      【解决方案2】:

      这里的一大成本是建立 TCP/HTTPS 连接。即使您只读取大文件的第一(短)行,为了更好地重用 HTTP/1.1 连接,现代 HTTP 客户端也会尝试将 read() 读取到文件末尾,这一事实加剧了这种情况,因此避免中止连接。这对于小文件来说是一个很好的策略,但对于那些以 MB 为单位的文件来说却不是。

      那里有一个解决方案:在读取时设置content-length,这样就只读入一个更小的块,减少close()的开销;连接回收会降低 HTTPS 设置成本。如果您在连接上设置 fadvise=random,这就是最新的 Hadoop/Spark S3A 客户端所做的:请求块而不是整个多 GB 文件。但请注意:如果您逐字节浏览文件,那么这种设计实际上非常糟糕......

      【讨论】:

        猜你喜欢
        • 2019-08-27
        • 2012-08-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-03-15
        • 2017-12-16
        • 2016-09-02
        • 2014-08-22
        相关资源
        最近更新 更多