安装Hama之前,应该首先确保系统中已经安装了hadoop,本集群使用的版本为hadoop-2.3.0

一、下载及解压Hama文件

  下载地址:http://www.apache.org/dyn/closer.cgi/hama,选用的是目前最新版本:hama0.6.4。解压之后的存放位置自己设定。

二、修改配置文件

  1. 在hama-env.sh文件中加入JAVA_HOME变量(分布式情况下,设为机器的值)
  2. 配置hama-site.xml(分布式情况下,所有机器的配置相同)

bsp.master.address为bsp master地址。fs.default.name参数设置成hadoop里namenode的地址。hama.zookeeper.quorum和      hama.zookeeper.property.clientPort两个参数和zookeeper有关,设置成为zookeeper的quorum server即可,单机伪分布式就是本机地址。

 Apache Hama安装部署

4. 配置groomservers文件。hama与hadoop具有相似的主从结构,该文件存放从节点的IP地址,每个IP占一行。(分布式情况下只需要配置BSPMaster所在的机器即可)

 

5. hama0.6.4自带的hadoop核心包为1.2.0,与集群hadoop2.3.0不一致,需要进行替换,具体是在hadoop的lib文件夹下找到hadoop-core-2.3.0*.jar和hadoop-test-2.3.0*.jar,拷贝到hama的lib目录下,并删除hadoop-core-1.2.0.jar和hadoop-test-1.2.0.jar两个文件。

  

  6. 此时可能会报找不到类的错, 需加入缺失的jar包。(把hadoop开头的jar包和protobuf-java-2.5.0.jar导入到hama/lib下)

 

三、编写Hama job

在eclipse下新建Java Project,将hama安装时需要的jar包全部导入工程。

 

官网中计算PI的例子:

  1 package pi;
  2 
  3 import java.io.IOException;
  4 import org.apache.commons.logging.Log;
  5 import org.apache.commons.logging.LogFactory;
  6 import org.apache.hadoop.fs.FSDataInputStream;
  7 import org.apache.hadoop.fs.FileStatus;
  8 import org.apache.hadoop.fs.FileSystem;
  9 import org.apache.hadoop.fs.Path;
 10 import org.apache.hadoop.io.DoubleWritable;
 11 import org.apache.hadoop.io.IOUtils;
 12 import org.apache.hadoop.io.NullWritable;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hama.HamaConfiguration;
 15 import org.apache.hama.bsp.BSP;
 16 import org.apache.hama.bsp.BSPJob;
 17 import org.apache.hama.bsp.BSPJobClient;
 18 import org.apache.hama.bsp.BSPPeer;
 19 import org.apache.hama.bsp.ClusterStatus;
 20 import org.apache.hama.bsp.FileOutputFormat;
 21 import org.apache.hama.bsp.NullInputFormat;
 22 import org.apache.hama.bsp.TextOutputFormat;
 23 import org.apache.hama.bsp.sync.SyncException;
 24 
 25 public class PiEstimator {
 26     private static Path TMP_OUTPUT = new Path("/tmp/pi-"
 27             + System.currentTimeMillis());
 28 
 29     public static class MyEstimator
 30             extends
 31             BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
 32         public static final Log LOG = LogFactory.getLog(MyEstimator.class);
 33         private String masterTask;
 34         private static final int iterations = 100000;
 35 
 36         @Override
 37         public void bsp(
 38                 BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
 39                 throws IOException, SyncException, InterruptedException {
 40 
 41             int in = 0;
 42             for (int i = 0; i < iterations; i++) {
 43                 double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
 44                 if ((Math.sqrt(x * x + y * y) < 1.0)) {
 45                     in++;
 46                 }
 47             }
 48 
 49             double data = 4.0 * in / iterations;
 50 
 51             peer.send(masterTask, new DoubleWritable(data));
 52             peer.sync();
 53 
 54             if (peer.getPeerName().equals(masterTask)) {
 55                 double pi = 0.0;
 56                 int numPeers = peer.getNumCurrentMessages();
 57                 DoubleWritable received;
 58                 while ((received = peer.getCurrentMessage()) != null) {
 59                     pi += received.get();
 60                 }
 61 
 62                 pi = pi / numPeers;
 63                 peer.write(new Text("Estimated value1 of PI is"),
 64                         new DoubleWritable(pi));
 65             }
 66             peer.sync();
 67 
 68             int in2 = 0;
 69             for (int i = 0; i < iterations; i++) {
 70                 double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
 71                 if ((Math.sqrt(x * x + y * y) < 1.0)) {
 72                     in2++;
 73                 }
 74             }
 75 
 76             double data2 = 4.0 * in2 / iterations;
 77 
 78             peer.send(masterTask, new DoubleWritable(data2));
 79             peer.sync();
 80 
 81             if (peer.getPeerName().equals(masterTask)) {
 82                 double pi2 = 0.0;
 83                 int numPeers = peer.getNumCurrentMessages();
 84                 DoubleWritable received;
 85                 while ((received = peer.getCurrentMessage()) != null) {
 86                     pi2 += received.get();
 87                 }
 88 
 89                 pi2 = pi2 / numPeers;
 90                 peer.write(new Text("Estimated value2 of PI is"),
 91                         new DoubleWritable(pi2));
 92             }
 93             peer.sync();
 94 
 95         }
 96 
 97         @Override
 98         public void setup(
 99                 BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
100                 throws IOException {
101             // Choose one as a master
102 
103             this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
104         }
105 
106         @Override
107         public void cleanup(
108                 BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
109                 throws IOException {
110 
111             // if (peer.getPeerName().equals(masterTask)) {
112             // double pi = 0.0;
113             // int numPeers = peer.getNumCurrentMessages();
114             // DoubleWritable received;
115             // while ((received = peer.getCurrentMessage()) != null) {
116             // pi += received.get();
117             // }
118             //
119             // pi = pi / numPeers;
120             // peer.write(new Text("Estimated value of PI is"),
121             // new DoubleWritable(pi));
122             // }
123         }
124     }
125 
126     static void printOutput(HamaConfiguration conf) throws IOException {
127         FileSystem fs = FileSystem.get(conf);
128         FileStatus[] files = fs.listStatus(TMP_OUTPUT);
129         for (int i = 0; i < files.length; i++) {
130             if (files[i].getLen() > 0) {
131                 FSDataInputStream in = fs.open(files[i].getPath());
132                 IOUtils.copyBytes(in, System.out, conf, false);
133                 in.close();
134                 break;
135             }
136         }
137 
138         fs.delete(TMP_OUTPUT, true);
139     }
140 
141     public static void main(String[] args) throws InterruptedException,
142             IOException, ClassNotFoundException {
143         // BSP job configuration
144         HamaConfiguration conf = new HamaConfiguration();
145         BSPJob bsp = new BSPJob(conf, PiEstimator.class);
146         // Set the job name
147         bsp.setJobName("Pi Estimation Example");
148         bsp.setBspClass(MyEstimator.class);
149         bsp.setInputFormat(NullInputFormat.class);
150         bsp.setOutputKeyClass(Text.class);
151         bsp.setOutputValueClass(DoubleWritable.class);
152         bsp.setOutputFormat(TextOutputFormat.class);
153         FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
154 
155         BSPJobClient jobClient = new BSPJobClient(conf);
156         ClusterStatus cluster = jobClient.getClusterStatus(true);
157 
158         if (args.length > 0) {
159             bsp.setNumBspTask(Integer.parseInt(args[0]));
160         } else {
161             // Set to maximum
162             bsp.setNumBspTask(cluster.getMaxTasks());
163         }
164 
165         long startTime = System.currentTimeMillis();
166 
167         if (bsp.waitForCompletion(true)) {
168             printOutput(conf);
169             System.out.println("Job Finished in "
170                     + (System.currentTimeMillis() - startTime) / 1000.0
171                     + " seconds");
172         }
173     }
174 
175 }
View PiEstimator 

相关文章:

  • 2022-12-23
  • 2021-07-01
  • 2021-06-12
  • 2021-12-28
  • 2022-12-23
  • 2023-02-03
  • 2021-12-18
  • 2021-10-19
猜你喜欢
  • 2022-12-23
  • 2021-08-25
  • 2021-08-05
  • 2021-12-19
  • 2021-09-06
  • 2021-11-28
  • 2021-06-14
相关资源
相似解决方案