1、读取文件:
1、客户端首先创建DistributedFileSystem对象
2、向NameNode发出下载请求
3、NameNode根据举例优先选择合适的节点以及数据块信息响应给客户端
4、客户端根据响应创建输出流下载数据,此时会先读取nn1数据,读取结束如果数据不够再建立下一个通道读取下一个节点数据,直到读取完毕。
案例:从hdfs上截取方式下载数据
@Test
//文件截取,下载块文件(比如查日志的时候只查看最后一块)
//查第一块
public void readFileSeek1() throws URISyntaxException, IOException, InterruptedException {
//获取对象
Configuration conf = new Configuration ();
FileSystem fs = FileSystem.get (new URI ("hdfs://hadoop01:9000"), conf, "root");
//获取输入流
FSDataInputStream fis = fs.open (new Path ("/hadoop-2.9.2.tar.gz"));
//获取输出流
FileOutputStream fos = new FileOutputStream (new File ("D:/hadoop-2.9.2.part1"));
//文件拷贝(此时只拷128M)
byte[] bytes = new byte[1024];
for (int i=0;i<1024*128;i++){
fis.read(bytes);
fos.write (bytes);
}
//关闭资源
IOUtils.closeStream (fos);
IOUtils.closeStream (fis);
fs.close ();
}
//下载最后一块
@Test
public void readFileSeek2() throws URISyntaxException, IOException, InterruptedException {
//获取对象
Configuration conf = new Configuration ();
FileSystem fs = FileSystem.get (new URI ("hdfs://hadoop01:9000"), conf, "root");
//获取输入流
FSDataInputStream fis = fs.open (new Path ("/hadoop-2.9.2.tar.gz"));
//设置指定读取的节点
fis.seek (1024*1024*256);
//获取输出流
FileOutputStream fos = new FileOutputStream (new File ("D:/hadoop-2.9.2.part1"));
//流拷贝
IOUtils.copyBytes (fis,fos,conf);
//关闭资源
IOUtils.closeStream (fos);
IOUtils.closeStream (fis);
fs.close ();
}
}
写入文件:
1、客户端首先会创建一个DistributedFileSystem的实例对象
2、然后向NameNode请求上传文件,NameNode检查路径确认文件路径然后响应
3、客户端接收到可以上传指令后向NameNode请求上传到的具体位置
4、NameNode根据距离和负载情况选择出相应的DataNode节点
5、客户端拿到位置创建流开始建立通道,DataNode进行序列化写入数据
6、DataNode写入数据结束后最终响应给客户端
例:
@Test
public void testCopyFromlFile() throws URISyntaxException, IOException, InterruptedException {
//获取fs对象
Configuration conf = new Configuration ();
FileSystem fs = FileSystem.get (new URI ("hdfs://xmaster:9000"), conf, "root");
//执行上传API\
fs.copyFromLocalFile (new Path ("D:/test.txt"),new Path ("/test.txt"));
//关闭资源
fs.close ();
}
@Test
//将本地的文件上传到HDFS上
public void putFileToHDFS() throws URISyntaxException, IOException, InterruptedException {
//获取对象
Configuration conf = new Configuration ();
FileSystem fs = FileSystem.get (new URI ("hdfs://hadoop01:9000"), conf, "root");
//获取输入流
FileInputStream fis = new FileInputStream ("D:/test.txt");
//获取输出流
FSDataOutputStream fos = fs.create (new Path ("/test.txt"));
//拷贝
IOUtils.copyBytes (fis, fos, conf);
//关闭资源
IOUtils.closeStream (fos);
IOUtils.closeStream (fis);
fs.close ();
}