【发布时间】:2015-12-21 01:42:49
【问题描述】:
关于为什么我的 spark 作业最终只能在一台机器上执行的任何想法?我正在使用超过约 250MB(51k 行)数据的映射函数,并且我希望看到所有节点中的活动,但是 spark UI 仅显示集群的三个节点之一上的活动。我用来调用我的应用程序的脚本如下所示:
spark-submit \
--class sparkUtils.DistributedParse \
--master spark://ip-172-31-27-55:7077 \
--executor-memory 4G \
bin/DistributedParse.jar \
<parameters>
我尝试过使用--deploy-mode cluster,但执行失败,使用--total-executor-cores 3 标志时没有任何区别。但是,根据spark documentation,在我的情况下应使用独立模式,这与此描述相匹配:“一种常见的部署策略是从与您的工作机器物理位于同一位置的网关机器提交您的应用程序(例如独立 EC2 集群中的主节点)。在此设置中,客户端模式是合适的。"
我的 Java 代码如下所示:
SparkConf sparkConf = new SparkConf().setAppName("DistributedParse");
JavaSparkContext context = new JavaSparkContext(sparkConf);
/* read and parse custom-delimited multiline text file */
Configuration conf = new Configuration();
conf.set("textinputformat.record.delimiter", "WARC/1.0");
JavaRDD<Text> records = context.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class, Text.class, conf).values();
final int urlPosition = 47;
...
ArrayList<String> importedNames = fillFromFile("./names.txt");
final Broadcast<ArrayList<String>> names = context.broadcast(importedNames);
records.flatMapToPair(new PairFlatMapFunction<String, String, String>() {
@Override
public Iterable<scala.Tuple2<String,String>> call(final String record) {
ArrayList<Tuple2<String,String>> url_nameEdge = new ArrayList<Tuple2<String,String>>();
for(String name: names.value()){
if( record.toLowerCase().contains(name.toLowerCase()) )
url_nameEdge.add( new Tuple2<String,String>( record.substring(urlPosition, record.indexOf(" ", urlPosition+1)) , name ) );
}
return url_nameEdge;
}
}).saveAsTextFile(outputFolder);
【问题讨论】:
标签: java amazon-ec2 apache-spark