【发布时间】:2017-05-23 11:37:46
【问题描述】:
请看下面的代码示例
JavaRDD<String> mapRDD = filteredRecords
.map(new Function<String, String>() {
@Override
public String call(String url) throws Exception {
BufferedReader in = null;
URL formatURL = new URL((url.replaceAll("\"", ""))
.trim());
try {
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
} finally {
if (in != null) {
in.close();
}
}
}
});
这里的 url 是 http GET 请求。例子
http://ip:port/cyb/test?event=movie&id=604568837&name=SID×tamp_secs=1460494800×tamp_millis=1461729600000&back_up_id=676700166
这段代码很慢。 IP 和端口是随机的,负载是分布的,所以 ip 可以有 20 个不同的端口值,所以我看不到瓶颈。
当我评论时
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
代码太快了。 注意:要处理的输入数据为 10GB。使用 spark 从 S3 读取数据。
我用 BufferedReader 或 InputStreamReader any alternative 有什么问题吗? 我不能在 spark 中使用 foreach,因为我必须从服务器获取响应并且需要将 JAVARdd 保存为 HDFS 上的 textFile。
如果我们使用 mappartition 代码如下
JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> tuple) throws Exception {
final List<String> rddList = new ArrayList<String>();
Iterable<String> iterable = new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return rddList.iterator();
}
};
while(tuple.hasNext()) {
URL formatURL = new URL((tuple.next().replaceAll("\"", ""))
.trim());
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
try(BufferedReader br = new BufferedReader(new InputStreamReader(con
.getInputStream()))) {
rddList.add(br.readLine());
} catch (IOException ex) {
return rddList;
}
}
return iterable;
}
});
对于我们正在做的每条记录也是如此.. 不是吗?
【问题讨论】:
标签: amazon-web-services apache-spark amazon-s3 java-7