安装Hama之前,应该首先确保系统中已经安装了hadoop,本集群使用的版本为hadoop-2.3.0
一、下载及解压Hama文件
下载地址:http://www.apache.org/dyn/closer.cgi/hama,选用的是目前最新版本:hama0.6.4。解压之后的存放位置自己设定。
二、修改配置文件
- 在hama-env.sh文件中加入JAVA_HOME变量(分布式情况下,设为机器的值)
- 配置hama-site.xml(分布式情况下,所有机器的配置相同)
bsp.master.address为bsp master地址。fs.default.name参数设置成hadoop里namenode的地址。hama.zookeeper.quorum和 hama.zookeeper.property.clientPort两个参数和zookeeper有关,设置成为zookeeper的quorum server即可,单机伪分布式就是本机地址。
4. 配置groomservers文件。hama与hadoop具有相似的主从结构,该文件存放从节点的IP地址,每个IP占一行。(分布式情况下只需要配置BSPMaster所在的机器即可)
5. hama0.6.4自带的hadoop核心包为1.2.0,与集群hadoop2.3.0不一致,需要进行替换,具体是在hadoop的lib文件夹下找到hadoop-core-2.3.0*.jar和hadoop-test-2.3.0*.jar,拷贝到hama的lib目录下,并删除hadoop-core-1.2.0.jar和hadoop-test-1.2.0.jar两个文件。
6. 此时可能会报找不到类的错, 需加入缺失的jar包。(把hadoop开头的jar包和protobuf-java-2.5.0.jar导入到hama/lib下)
三、编写Hama job
在eclipse下新建Java Project,将hama安装时需要的jar包全部导入工程。
官网中计算PI的例子:
1 package pi; 2 3 import java.io.IOException; 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 import org.apache.hadoop.fs.FSDataInputStream; 7 import org.apache.hadoop.fs.FileStatus; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.DoubleWritable; 11 import org.apache.hadoop.io.IOUtils; 12 import org.apache.hadoop.io.NullWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hama.HamaConfiguration; 15 import org.apache.hama.bsp.BSP; 16 import org.apache.hama.bsp.BSPJob; 17 import org.apache.hama.bsp.BSPJobClient; 18 import org.apache.hama.bsp.BSPPeer; 19 import org.apache.hama.bsp.ClusterStatus; 20 import org.apache.hama.bsp.FileOutputFormat; 21 import org.apache.hama.bsp.NullInputFormat; 22 import org.apache.hama.bsp.TextOutputFormat; 23 import org.apache.hama.bsp.sync.SyncException; 24 25 public class PiEstimator { 26 private static Path TMP_OUTPUT = new Path("/tmp/pi-" 27 + System.currentTimeMillis()); 28 29 public static class MyEstimator 30 extends 31 BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> { 32 public static final Log LOG = LogFactory.getLog(MyEstimator.class); 33 private String masterTask; 34 private static final int iterations = 100000; 35 36 @Override 37 public void bsp( 38 BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) 39 throws IOException, SyncException, InterruptedException { 40 41 int in = 0; 42 for (int i = 0; i < iterations; i++) { 43 double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; 44 if ((Math.sqrt(x * x + y * y) < 1.0)) { 45 in++; 46 } 47 } 48 49 double data = 4.0 * in / iterations; 50 51 peer.send(masterTask, new DoubleWritable(data)); 52 peer.sync(); 53 54 if (peer.getPeerName().equals(masterTask)) { 55 double pi = 0.0; 56 int numPeers = peer.getNumCurrentMessages(); 57 DoubleWritable received; 58 while ((received = peer.getCurrentMessage()) != null) { 59 pi += received.get(); 60 } 61 62 pi = pi / numPeers; 63 peer.write(new Text("Estimated value1 of PI is"), 64 new DoubleWritable(pi)); 65 } 66 peer.sync(); 67 68 int in2 = 0; 69 for (int i = 0; i < iterations; i++) { 70 double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; 71 if ((Math.sqrt(x * x + y * y) < 1.0)) { 72 in2++; 73 } 74 } 75 76 double data2 = 4.0 * in2 / iterations; 77 78 peer.send(masterTask, new DoubleWritable(data2)); 79 peer.sync(); 80 81 if (peer.getPeerName().equals(masterTask)) { 82 double pi2 = 0.0; 83 int numPeers = peer.getNumCurrentMessages(); 84 DoubleWritable received; 85 while ((received = peer.getCurrentMessage()) != null) { 86 pi2 += received.get(); 87 } 88 89 pi2 = pi2 / numPeers; 90 peer.write(new Text("Estimated value2 of PI is"), 91 new DoubleWritable(pi2)); 92 } 93 peer.sync(); 94 95 } 96 97 @Override 98 public void setup( 99 BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) 100 throws IOException { 101 // Choose one as a master 102 103 this.masterTask = peer.getPeerName(peer.getNumPeers() / 2); 104 } 105 106 @Override 107 public void cleanup( 108 BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) 109 throws IOException { 110 111 // if (peer.getPeerName().equals(masterTask)) { 112 // double pi = 0.0; 113 // int numPeers = peer.getNumCurrentMessages(); 114 // DoubleWritable received; 115 // while ((received = peer.getCurrentMessage()) != null) { 116 // pi += received.get(); 117 // } 118 // 119 // pi = pi / numPeers; 120 // peer.write(new Text("Estimated value of PI is"), 121 // new DoubleWritable(pi)); 122 // } 123 } 124 } 125 126 static void printOutput(HamaConfiguration conf) throws IOException { 127 FileSystem fs = FileSystem.get(conf); 128 FileStatus[] files = fs.listStatus(TMP_OUTPUT); 129 for (int i = 0; i < files.length; i++) { 130 if (files[i].getLen() > 0) { 131 FSDataInputStream in = fs.open(files[i].getPath()); 132 IOUtils.copyBytes(in, System.out, conf, false); 133 in.close(); 134 break; 135 } 136 } 137 138 fs.delete(TMP_OUTPUT, true); 139 } 140 141 public static void main(String[] args) throws InterruptedException, 142 IOException, ClassNotFoundException { 143 // BSP job configuration 144 HamaConfiguration conf = new HamaConfiguration(); 145 BSPJob bsp = new BSPJob(conf, PiEstimator.class); 146 // Set the job name 147 bsp.setJobName("Pi Estimation Example"); 148 bsp.setBspClass(MyEstimator.class); 149 bsp.setInputFormat(NullInputFormat.class); 150 bsp.setOutputKeyClass(Text.class); 151 bsp.setOutputValueClass(DoubleWritable.class); 152 bsp.setOutputFormat(TextOutputFormat.class); 153 FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT); 154 155 BSPJobClient jobClient = new BSPJobClient(conf); 156 ClusterStatus cluster = jobClient.getClusterStatus(true); 157 158 if (args.length > 0) { 159 bsp.setNumBspTask(Integer.parseInt(args[0])); 160 } else { 161 // Set to maximum 162 bsp.setNumBspTask(cluster.getMaxTasks()); 163 } 164 165 long startTime = System.currentTimeMillis(); 166 167 if (bsp.waitForCompletion(true)) { 168 printOutput(conf); 169 System.out.println("Job Finished in " 170 + (System.currentTimeMillis() - startTime) / 1000.0 171 + " seconds"); 172 } 173 } 174 175 }