准备:
确保hadoop2.2.0集群正常运行
1.eclipse中建立mven工程,并编辑pom文件如下
 <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.96.2-hadoop2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.7</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
    </dependencies>

2.在src/main/resources根目录下拷入log4j.properties,通过log4j查看详细日志

log4j.rootLogger=debug, stdout, R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=firestorm.log
log4j.appender.R.MaxFileSize=100KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
log4j.logger.com.codefutures=DEBUG

3.拷入一个可执行的hadoop程序,我用的是一个HdfsDAO,可以先保证HDFS操作能执行

package com.bigdata.hdfs;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;

public class HdfsDAO {
    private static final String HDFS = "hdfs://192.168.11.37:9000/";
    
    
    public HdfsDAO(Configuration conf) {
        this(HDFS, conf);
    }
    
    public HdfsDAO(String hdfs, Configuration conf) {
        this.hdfsPath = hdfs;
        this.conf = conf;
    }
    private String hdfsPath;
    private Configuration conf;
    public static void main(String[] args) throws IOException {
        JobConf conf = config();
        HdfsDAO hdfs = new HdfsDAO(conf);
//        hdfs.copyFile("datafile/item.csv", "/tmp/new");
//        hdfs.ls("/tmp/new");
        hdfs.ls("/");
    }        
    
    public static JobConf config(){
        JobConf conf = new JobConf(HdfsDAO.class);
        conf.setJobName("HdfsDAO");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }
    
    public void mkdirs(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
            System.out.println("Create: " + folder);
        }
        fs.close();
    }
    public void rmr(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.deleteOnExit(path);
        System.out.println("Delete: " + folder);
        fs.close();
    }
    public void ls(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FileStatus[] list = fs.listStatus(path);
        System.out.println("ls: " + folder);
        System.out.println("==========================================================");
        for (FileStatus f : list) {
            System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
        }
        System.out.println("==========================================================");
        fs.close();
    }
    public void createFile(String file, String content) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        byte[] buff = content.getBytes();
        FSDataOutputStream os = null;
        try {
            os = fs.create(new Path(file));
            os.write(buff, 0, buff.length);
            System.out.println("Create: " + file);
        } finally {
            if (os != null)
                os.close();
        }
        fs.close();
    }
    public void copyFile(String local, String remote) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyFromLocalFile(new Path(local), new Path(remote));
        System.out.println("copy from: " + local + " to " + remote);
        fs.close();
    }
    public void download(String remote, String local) throws IOException {
        Path path = new Path(remote);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyToLocalFile(path, new Path(local));
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }
    
    public void cat(String remoteFile) throws IOException {
        Path path = new Path(remoteFile);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FSDataInputStream fsdis = null;
        System.out.println("cat: " + remoteFile);
        try {  
            fsdis =fs.open(path);
            IOUtils.copyBytes(fsdis, System.out, 4096, false);  
          } finally {  
            IOUtils.closeStream(fsdis);
            fs.close();
          }
    }
    public void location() throws IOException {
        // String folder = hdfsPath + "create/";
        // String file = "t2.txt";
        // FileSystem fs = FileSystem.get(URI.create(hdfsPath), new
        // Configuration());
        // FileStatus f = fs.getFileStatus(new Path(folder + file));
        // BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());
        //
        // System.out.println("File Location: " + folder + file);
        // for (BlockLocation bl : list) {
        // String[] hosts = bl.getHosts();
        // for (String host : hosts) {
        // System.out.println("host:" + host);
        // }
        // }
        // fs.close();
    }

}

4.运行HdfsDAO

报错:
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
    at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
    at org.apache.hadoop.conf.Configuration.getTrimmedStrings(Configuration.java:1546)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:519)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:453)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:136)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
    at HdfsDAO.copyFile(HdfsDAO.java:94)
    at HdfsDAO.main(HdfsDAO.java:34)
ERROR - Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
    at org.apache.hadoop.conf.Configuration.getTrimmedStrings(Configuration.java:1546)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:519)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:453)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:136)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
    at HdfsDAO.copyFile(HdfsDAO.java:94)
    at HdfsDAO.main(HdfsDAO.java:34)

  

解决:
首先,在win7中设置环境变量HADOOP_HOME,指向win7中的hadoop2.2.0根目录。
然后,到 https://github.com/srccodes/hadoop-common-2.2.0-bin 去下载hadoop2.2.0的bin,里面有winutils.exe
将其拷贝到 $HADOOP_HOME/bin 下。
5.重新启动,顺利执行
DEBUG - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
DEBUG - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
DEBUG - UgiMetrics, User and group related metrics
DEBUG - Kerberos krb5 configuration not found, setting default realm to empty
DEBUG -  Creating new Groups object
DEBUG - Trying to load the custom-built native-hadoop library...
DEBUG - Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
DEBUG - java.library.path=D:\Program Files\Java\jre7\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x64;D:\Program Files\Java\jdk1.7.0_40\bin;D:\Program Files\Java\jdk1.7.0_40\jre\bin;D:\Program Files\TortoiseSVN\bin;D:\Program Files (x86)\ant\bin;D:\Program Files\maven3\bin;.
 WARN - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DEBUG - Falling back to shell based
DEBUG - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
DEBUG - Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000
DEBUG - hadoop login
DEBUG - hadoop login commit
DEBUG - using local user:NTUserPrincipal: Administrator
DEBUG - UGI loginUser:Administrator (auth:SIMPLE)
DEBUG - dfs.client.use.legacy.blockreader.local = false
DEBUG - dfs.client.read.shortcircuit = false
DEBUG - dfs.client.domain.socket.data.traffic = false
DEBUG - dfs.domain.socket.path = 
DEBUG - StartupProgress, NameNode startup progress
DEBUG - multipleLinearRandomRetry = null
DEBUG - rpcKind=RPC_PROTOCOL_BUFFER, rpcRequestWrapperClass=class org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper, rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@1afde4a3
DEBUG - Both short-circuit local reads and UNIX domain socket are disabled.
DEBUG - The ping interval is 60000 ms.
DEBUG - Connecting to /192.168.0.160:8020
DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator: starting, having connections 1
DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator sending #0
DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator got value #0
DEBUG - Call: getListing took 136ms
ls: /
==========================================================
name: hdfs://192.168.0.160:8020/data, folder: true, size: 0
name: hdfs://192.168.0.160:8020/fulong, folder: true, size: 0
name: hdfs://192.168.0.160:8020/test, folder: true, size: 0
name: hdfs://192.168.0.160:8020/tmp, folder: true, size: 0
name: hdfs://192.168.0.160:8020/user, folder: true, size: 0
name: hdfs://192.168.0.160:8020/workspace, folder: true, size: 0
==========================================================
DEBUG - Stopping client
DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator: closed
DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator: stopped, remaining connections 0
View Code

6.测试hbase代码

package com.rockontrol.tryhbase;
import static org.junit.Assert.*;

import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.junit.Test;

public class TestUseHbase {
   
   private String table = "Tenant";
   private String cfs[] = {"i"};
   private final int availableProcessors = 
         Runtime.getRuntime().availableProcessors();
   private ExecutorService exec = 
         Executors.newFixedThreadPool(availableProcessors*2);
   private Random rnd = new Random();
   private final int ROW_KEY_LEN = Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
   private final String colId = "id";
   private final String colStat = "stat";
   private final String colCert = "cert";
   
   private Configuration conf;
   private HTablePool pool;
   
   private static final Logger logger = 
         Logger.getLogger(TestUseHbase.class);
   
   public TestUseHbase() throws Exception {
      conf = new Configuration();
      conf.addResource(getHbaseConfStream());
      pool = new HTablePool(conf, 1000);
   }
   
   @Test
   public void testSetupTable() throws Exception {
      
      HBaseAdmin admin = new HBaseAdmin(conf);
      
      try {
         if (admin.tableExists(table)) {
            logger.info("table already exists!");
         } else {
            HTableDescriptor tableDesc =new HTableDescriptor(table);
            for(String cf : cfs) {
               tableDesc.addFamily(new HColumnDescriptor(cf));
            }
            admin.createTable(tableDesc);
            logger.info("table created!");
         }
      } finally {
         admin.close();
      }
   }
   
   @Test
   public void testPuts() throws Exception {

      final HTable htable = (HTable) pool.getTable(table);
      // put random id
      for (int i = 0; i < 10; i++) {
         exec.execute(new Runnable() {
            @Override
            public void run() {
               long authId = getAuthId();
               byte[] rowkey = createRowKey(authId, (byte) 0);
               htable.setAutoFlush(false);
               Put put = new Put(rowkey);
               put.add(cfs[0].getBytes(), colId.getBytes(), String.valueOf(authId)
                     .getBytes());
               put.add(cfs[0].getBytes(), colStat.getBytes(), String.valueOf(0)
                     .getBytes());
               try {
                  synchronized (htable) {
                     htable.put(put);
                     htable.flushCommits();
                  }
               } catch (IOException e) {
                  logger.error("ERROR: insert authId=" + authId, e);
               }
            }
         });
      }
      exec.shutdown();

      int count = 0;
      while (!exec.awaitTermination(10, TimeUnit.SECONDS)) {
         logger.warn("thread pool is still running");
         if (count++ > 3) {
            logger.warn("force to exit anyway...");
            break;
         }
      }

      htable.flushCommits();
      pool.putTable(htable);

   }
   
   @Test
   public void testFullScan() throws Exception {

      HTable htable = (HTable) pool.getTable(table);
      long last = Long.MIN_VALUE;

      ResultScanner rs = htable.getScanner(new Scan());
      long authId = 0;
      byte stat = 0;
      String strAuthId;
      String strStat;
      for (Result r : rs) {

         KeyValue kvId = r.getColumnLatest(cfs[0].getBytes(), colId.getBytes());
         KeyValue kvStat = r.getColumnLatest(cfs[0].getBytes(), colStat.getBytes());
         if (kvId != null && kvStat != null) {
            strAuthId = new String(kvId.getValue());
            strStat = new String(kvStat.getValue());
            authId = getIdByRowKey(kvId.getKey());
            stat = getStatByRowKey(kvId.getKey());
            assertTrue("last=" + last + 
                  ", current=" + authId, authId >= last); // incremental sorted
            last = authId;
            logger.info("authId=" + authId + ", stat=" + stat + ", value=[" + strAuthId
                  + ", " + strStat + "]");
         } else {
            for (KeyValue kv : r.raw()) {
               authId = getIdByRowKey(kv.getKey());
               stat = getStatByRowKey(kv.getKey());
               assertTrue("last=" + last + 
                     ", current=" + authId, authId >= last); // incremental sort                        
               last = authId;
               logger.info("authId=" + authId + ", stat=" + stat);
               logger.info(new String(kv.getValue()));
            }
         }  
      }

   }
   
   @Test
   public void testSpecScan() throws Exception {
      HTable htable = (HTable) pool.getTable(table);
      long specId = getAuthId();
      byte[] rowkey = createRowKey(specId, (byte) 0);
      
      // PUT
      Put put = new Put(rowkey);
      put.add(cfs[0].getBytes(), colId.getBytes(), String.valueOf(specId)
            .getBytes());
      put.add(cfs[0].getBytes(), colStat.getBytes(), String.valueOf(0)
            .getBytes());
      htable.put(put);
      
      // Get with rowkey
      Get scan = new Get(rowkey);
      Result r = htable.get(scan);
      assertTrue(!r.isEmpty());
      long id = 0;
      for(KeyValue kv : r.raw()) {
         id = getIdByRowKey(kv.getKey());
         assertEquals(specId, id);
         logger.info("authId=" + id + 
               ", cf=" + new String(kv.getFamily()) +
               ", key=" + new String(kv.getQualifier()) +
               ", value=" + new String(kv.getValue()));
      }
      
      // Put with specId but stat and different column
      rowkey = createRowKey(specId, (byte)1);
      put = new Put(rowkey);
      put.add(cfs[0].getBytes(), colCert.getBytes(), "xyz".getBytes());
      htable.put(put);
      
      // Get with rowkey prefix
      Scan s = new Scan();
      s.setFilter(new PrefixFilter(createRowKeyPrefix(specId)));
      ResultScanner rs = htable.getScanner(s);
      for(Result ret : rs) {
         String strk = new String(ret.getRow());
         logger.info("ret=" + strk);
         for(KeyValue kv : ret.raw()) {
            id = getIdByRowKey(kv.getKey());
            assertEquals(specId, id);
            logger.info("authId=" + id + 
                  ", stat=" + getStatByRowKey(kv.getKey()) +
                  ", cf=" + new String(kv.getFamily()) +
                  ", key=" + new String(kv.getQualifier()) +
                  ", value=" + new String(kv.getValue()));
         }
      }
      
      // Get with start and end row
      s = new Scan();
      s.setStartRow(createRowKeyPrefix(specId));
      s.setStopRow(createRowKeyPrefix(specId+1));
      rs = htable.getScanner(s);
      for(Result ret : rs) {
         String strk = new String(ret.getRow());
         logger.info("ret=" + strk);
         for(KeyValue kv : ret.raw()) {
            id = getIdByRowKey(kv.getKey());
            assertEquals(specId, id);
            logger.info("authId=" + id + 
                  ", stat=" + getStatByRowKey(kv.getKey()) +
                  ", cf=" + new String(kv.getFamily()) +
                  ", key=" + new String(kv.getQualifier()) +
                  ", value=" + new String(kv.getValue()));
         }
      }
   }
   
   @Test
   public void testBytesConv() throws Exception {
      long a = 120;
      byte s = 0;
      byte[] data = new byte[9];
      int off = Bytes.putLong(data, 0, a);
      Bytes.putByte(data, off, s);
      long b = Bytes.toLong(data);
      byte t = data[8];
      assertEquals(a, b);
      assertEquals(s, t);
   }
   
   private byte[] createRowKey(long authId, byte stat) {
      byte[] rowkey = new byte[ROW_KEY_LEN];
      int off = Bytes.putLong(rowkey, 0, authId);
      Bytes.putByte(rowkey, off, stat);
      return rowkey;
   }
   
   private byte[] createRowKeyPrefix(long authId) {
      byte[] prefix = new byte[Bytes.SIZEOF_LONG];
      Bytes.putLong(prefix, 0, authId);
      return prefix;
   }
   
   private long getIdByRowKey(byte[] rowkey) {
      // HACK
      return Bytes.toLong(rowkey, Bytes.SIZEOF_SHORT);
   }
   
   private byte getStatByRowKey(byte[] rowkey) {
      // HACK
      return rowkey[Bytes.SIZEOF_SHORT + ROW_KEY_LEN - 1];
   }
   
   private long getAuthId() {
      long authId = rnd.nextLong();
      authId = authId > 0 ? authId : -authId;
      return authId;
   }

   private static InputStream getHbaseConfStream() throws Exception {
      return TestUseHbase.class.getClassLoader().getResourceAsStream("hbase-site.xml");
   }

}
View Code

相关文章:

  • 2022-02-08
  • 2021-08-30
  • 2021-08-01
  • 2022-02-10
  • 2021-10-27
  • 2021-06-02
  • 2021-09-04
猜你喜欢
  • 2021-12-10
  • 2021-12-31
  • 2022-02-24
  • 2021-06-16
  • 2021-03-31
  • 2021-07-19
相关资源
相似解决方案