准备:
确保hadoop2.2.0集群正常运行
1.eclipse中建立mven工程,并编辑pom文件如下
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.96.2-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
2.在src/main/resources根目录下拷入log4j.properties,通过log4j查看详细日志
log4j.rootLogger=debug, stdout, R log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=firestorm.log log4j.appender.R.MaxFileSize=100KB log4j.appender.R.MaxBackupIndex=1 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n log4j.logger.com.codefutures=DEBUG
3.拷入一个可执行的hadoop程序,我用的是一个HdfsDAO,可以先保证HDFS操作能执行
package com.bigdata.hdfs; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; public class HdfsDAO { private static final String HDFS = "hdfs://192.168.11.37:9000/"; public HdfsDAO(Configuration conf) { this(HDFS, conf); } public HdfsDAO(String hdfs, Configuration conf) { this.hdfsPath = hdfs; this.conf = conf; } private String hdfsPath; private Configuration conf; public static void main(String[] args) throws IOException { JobConf conf = config(); HdfsDAO hdfs = new HdfsDAO(conf); // hdfs.copyFile("datafile/item.csv", "/tmp/new"); // hdfs.ls("/tmp/new"); hdfs.ls("/"); } public static JobConf config(){ JobConf conf = new JobConf(HdfsDAO.class); conf.setJobName("HdfsDAO"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); return conf; } public void mkdirs(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); } public void rmr(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); } public void ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("=========================================================="); for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); } System.out.println("=========================================================="); fs.close(); } public void createFile(String file, String content) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); byte[] buff = content.getBytes(); FSDataOutputStream os = null; try { os = fs.create(new Path(file)); os.write(buff, 0, buff.length); System.out.println("Create: " + file); } finally { if (os != null) os.close(); } fs.close(); } public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); } public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyToLocalFile(path, new Path(local)); System.out.println("download: from" + remote + " to " + local); fs.close(); } public void cat(String remoteFile) throws IOException { Path path = new Path(remoteFile); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FSDataInputStream fsdis = null; System.out.println("cat: " + remoteFile); try { fsdis =fs.open(path); IOUtils.copyBytes(fsdis, System.out, 4096, false); } finally { IOUtils.closeStream(fsdis); fs.close(); } } public void location() throws IOException { // String folder = hdfsPath + "create/"; // String file = "t2.txt"; // FileSystem fs = FileSystem.get(URI.create(hdfsPath), new // Configuration()); // FileStatus f = fs.getFileStatus(new Path(folder + file)); // BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen()); // // System.out.println("File Location: " + folder + file); // for (BlockLocation bl : list) { // String[] hosts = bl.getHosts(); // for (String host : hosts) { // System.out.println("host:" + host); // } // } // fs.close(); } }
4.运行HdfsDAO
报错:
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
at org.apache.hadoop.conf.Configuration.getTrimmedStrings(Configuration.java:1546)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:519)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:453)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:136)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at HdfsDAO.copyFile(HdfsDAO.java:94)
at HdfsDAO.main(HdfsDAO.java:34)
ERROR - Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
at org.apache.hadoop.conf.Configuration.getTrimmedStrings(Configuration.java:1546)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:519)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:453)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:136)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at HdfsDAO.copyFile(HdfsDAO.java:94)
at HdfsDAO.main(HdfsDAO.java:34)
解决:
首先,在win7中设置环境变量HADOOP_HOME,指向win7中的hadoop2.2.0根目录。
然后,到 https://github.com/srccodes/hadoop-common-2.2.0-bin 去下载hadoop2.2.0的bin,里面有winutils.exe
将其拷贝到 $HADOOP_HOME/bin 下。
5.重新启动,顺利执行
DEBUG - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops) DEBUG - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops) DEBUG - UgiMetrics, User and group related metrics DEBUG - Kerberos krb5 configuration not found, setting default realm to empty DEBUG - Creating new Groups object DEBUG - Trying to load the custom-built native-hadoop library... DEBUG - Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path DEBUG - java.library.path=D:\Program Files\Java\jre7\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x64;D:\Program Files\Java\jdk1.7.0_40\bin;D:\Program Files\Java\jdk1.7.0_40\jre\bin;D:\Program Files\TortoiseSVN\bin;D:\Program Files (x86)\ant\bin;D:\Program Files\maven3\bin;. WARN - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable DEBUG - Falling back to shell based DEBUG - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping DEBUG - Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000 DEBUG - hadoop login DEBUG - hadoop login commit DEBUG - using local user:NTUserPrincipal: Administrator DEBUG - UGI loginUser:Administrator (auth:SIMPLE) DEBUG - dfs.client.use.legacy.blockreader.local = false DEBUG - dfs.client.read.shortcircuit = false DEBUG - dfs.client.domain.socket.data.traffic = false DEBUG - dfs.domain.socket.path = DEBUG - StartupProgress, NameNode startup progress DEBUG - multipleLinearRandomRetry = null DEBUG - rpcKind=RPC_PROTOCOL_BUFFER, rpcRequestWrapperClass=class org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper, rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@1afde4a3 DEBUG - Both short-circuit local reads and UNIX domain socket are disabled. DEBUG - The ping interval is 60000 ms. DEBUG - Connecting to /192.168.0.160:8020 DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator: starting, having connections 1 DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator sending #0 DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator got value #0 DEBUG - Call: getListing took 136ms ls: / ========================================================== name: hdfs://192.168.0.160:8020/data, folder: true, size: 0 name: hdfs://192.168.0.160:8020/fulong, folder: true, size: 0 name: hdfs://192.168.0.160:8020/test, folder: true, size: 0 name: hdfs://192.168.0.160:8020/tmp, folder: true, size: 0 name: hdfs://192.168.0.160:8020/user, folder: true, size: 0 name: hdfs://192.168.0.160:8020/workspace, folder: true, size: 0 ========================================================== DEBUG - Stopping client DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator: closed DEBUG - IPC Client (60133785) connection to /192.168.0.160:8020 from Administrator: stopped, remaining connections 0
6.测试hbase代码
package com.rockontrol.tryhbase; import static org.junit.Assert.*; import java.io.IOException; import java.io.InputStream; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.junit.Test; public class TestUseHbase { private String table = "Tenant"; private String cfs[] = {"i"}; private final int availableProcessors = Runtime.getRuntime().availableProcessors(); private ExecutorService exec = Executors.newFixedThreadPool(availableProcessors*2); private Random rnd = new Random(); private final int ROW_KEY_LEN = Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE; private final String colId = "id"; private final String colStat = "stat"; private final String colCert = "cert"; private Configuration conf; private HTablePool pool; private static final Logger logger = Logger.getLogger(TestUseHbase.class); public TestUseHbase() throws Exception { conf = new Configuration(); conf.addResource(getHbaseConfStream()); pool = new HTablePool(conf, 1000); } @Test public void testSetupTable() throws Exception { HBaseAdmin admin = new HBaseAdmin(conf); try { if (admin.tableExists(table)) { logger.info("table already exists!"); } else { HTableDescriptor tableDesc =new HTableDescriptor(table); for(String cf : cfs) { tableDesc.addFamily(new HColumnDescriptor(cf)); } admin.createTable(tableDesc); logger.info("table created!"); } } finally { admin.close(); } } @Test public void testPuts() throws Exception { final HTable htable = (HTable) pool.getTable(table); // put random id for (int i = 0; i < 10; i++) { exec.execute(new Runnable() { @Override public void run() { long authId = getAuthId(); byte[] rowkey = createRowKey(authId, (byte) 0); htable.setAutoFlush(false); Put put = new Put(rowkey); put.add(cfs[0].getBytes(), colId.getBytes(), String.valueOf(authId) .getBytes()); put.add(cfs[0].getBytes(), colStat.getBytes(), String.valueOf(0) .getBytes()); try { synchronized (htable) { htable.put(put); htable.flushCommits(); } } catch (IOException e) { logger.error("ERROR: insert authId=" + authId, e); } } }); } exec.shutdown(); int count = 0; while (!exec.awaitTermination(10, TimeUnit.SECONDS)) { logger.warn("thread pool is still running"); if (count++ > 3) { logger.warn("force to exit anyway..."); break; } } htable.flushCommits(); pool.putTable(htable); } @Test public void testFullScan() throws Exception { HTable htable = (HTable) pool.getTable(table); long last = Long.MIN_VALUE; ResultScanner rs = htable.getScanner(new Scan()); long authId = 0; byte stat = 0; String strAuthId; String strStat; for (Result r : rs) { KeyValue kvId = r.getColumnLatest(cfs[0].getBytes(), colId.getBytes()); KeyValue kvStat = r.getColumnLatest(cfs[0].getBytes(), colStat.getBytes()); if (kvId != null && kvStat != null) { strAuthId = new String(kvId.getValue()); strStat = new String(kvStat.getValue()); authId = getIdByRowKey(kvId.getKey()); stat = getStatByRowKey(kvId.getKey()); assertTrue("last=" + last + ", current=" + authId, authId >= last); // incremental sorted last = authId; logger.info("authId=" + authId + ", stat=" + stat + ", value=[" + strAuthId + ", " + strStat + "]"); } else { for (KeyValue kv : r.raw()) { authId = getIdByRowKey(kv.getKey()); stat = getStatByRowKey(kv.getKey()); assertTrue("last=" + last + ", current=" + authId, authId >= last); // incremental sort last = authId; logger.info("authId=" + authId + ", stat=" + stat); logger.info(new String(kv.getValue())); } } } } @Test public void testSpecScan() throws Exception { HTable htable = (HTable) pool.getTable(table); long specId = getAuthId(); byte[] rowkey = createRowKey(specId, (byte) 0); // PUT Put put = new Put(rowkey); put.add(cfs[0].getBytes(), colId.getBytes(), String.valueOf(specId) .getBytes()); put.add(cfs[0].getBytes(), colStat.getBytes(), String.valueOf(0) .getBytes()); htable.put(put); // Get with rowkey Get scan = new Get(rowkey); Result r = htable.get(scan); assertTrue(!r.isEmpty()); long id = 0; for(KeyValue kv : r.raw()) { id = getIdByRowKey(kv.getKey()); assertEquals(specId, id); logger.info("authId=" + id + ", cf=" + new String(kv.getFamily()) + ", key=" + new String(kv.getQualifier()) + ", value=" + new String(kv.getValue())); } // Put with specId but stat and different column rowkey = createRowKey(specId, (byte)1); put = new Put(rowkey); put.add(cfs[0].getBytes(), colCert.getBytes(), "xyz".getBytes()); htable.put(put); // Get with rowkey prefix Scan s = new Scan(); s.setFilter(new PrefixFilter(createRowKeyPrefix(specId))); ResultScanner rs = htable.getScanner(s); for(Result ret : rs) { String strk = new String(ret.getRow()); logger.info("ret=" + strk); for(KeyValue kv : ret.raw()) { id = getIdByRowKey(kv.getKey()); assertEquals(specId, id); logger.info("authId=" + id + ", stat=" + getStatByRowKey(kv.getKey()) + ", cf=" + new String(kv.getFamily()) + ", key=" + new String(kv.getQualifier()) + ", value=" + new String(kv.getValue())); } } // Get with start and end row s = new Scan(); s.setStartRow(createRowKeyPrefix(specId)); s.setStopRow(createRowKeyPrefix(specId+1)); rs = htable.getScanner(s); for(Result ret : rs) { String strk = new String(ret.getRow()); logger.info("ret=" + strk); for(KeyValue kv : ret.raw()) { id = getIdByRowKey(kv.getKey()); assertEquals(specId, id); logger.info("authId=" + id + ", stat=" + getStatByRowKey(kv.getKey()) + ", cf=" + new String(kv.getFamily()) + ", key=" + new String(kv.getQualifier()) + ", value=" + new String(kv.getValue())); } } } @Test public void testBytesConv() throws Exception { long a = 120; byte s = 0; byte[] data = new byte[9]; int off = Bytes.putLong(data, 0, a); Bytes.putByte(data, off, s); long b = Bytes.toLong(data); byte t = data[8]; assertEquals(a, b); assertEquals(s, t); } private byte[] createRowKey(long authId, byte stat) { byte[] rowkey = new byte[ROW_KEY_LEN]; int off = Bytes.putLong(rowkey, 0, authId); Bytes.putByte(rowkey, off, stat); return rowkey; } private byte[] createRowKeyPrefix(long authId) { byte[] prefix = new byte[Bytes.SIZEOF_LONG]; Bytes.putLong(prefix, 0, authId); return prefix; } private long getIdByRowKey(byte[] rowkey) { // HACK return Bytes.toLong(rowkey, Bytes.SIZEOF_SHORT); } private byte getStatByRowKey(byte[] rowkey) { // HACK return rowkey[Bytes.SIZEOF_SHORT + ROW_KEY_LEN - 1]; } private long getAuthId() { long authId = rnd.nextLong(); authId = authId > 0 ? authId : -authId; return authId; } private static InputStream getHbaseConfStream() throws Exception { return TestUseHbase.class.getClassLoader().getResourceAsStream("hbase-site.xml"); } }