hdfscli命令行
|
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
#
hdfscli --help
HdfsCLI:
a command line interface for HDFS.
Usage:
hdfscli
[interactive] [-a ALIAS] [-v...]
hdfscli
download [-fsa ALIAS] [-v...] [-t THREADS] HDFS_PATH LOCAL_PATH
hdfscli
upload [-sa ALIAS] [-v...] [-A | -f] [-t THREADS] LOCAL_PATH HDFS_PATH
hdfscli
-L | -V | -h
Commands:
download
Download a file or folder from HDFS.
If a
single
file is downloaded,
- can be
specified as LOCAL_PATH
to stream it to
standard out.
interactive
Start the client and expose it via the python
interpreter
(using iPython if available).
upload
Upload a file or folder to HDFS. - can be
specified as LOCAL_PATH
to read from standard
in.
Arguments:
HDFS_PATH
Remote HDFS path.
LOCAL_PATH
Path to local file or directory.
Options:
-A
--append Append data to an existing file. Only supported
if uploading
a single file or from standard in.
-L
--log Show path to current log file and exit.
-V
--version Show version and exit.
-a
ALIAS --alias=ALIAS Alias of namenode to connect to.
-f
--force Allow overwriting any existing files.
-s
--silent Don't display progress status.
-t
THREADS --threads=THREADS Number of threads to use for parallelization.
0
allocates a thread per file. [default:
0]
-v
--verbose Enable log output. Can be specified up to three
times
(increasing verbosity each time).
Examples:
hdfscli
-a prod /user/foo
hdfscli
download features.avro dat/
hdfscli
download logs/1987-03-23 - >>logs
hdfscli
upload -f - data/weights.tsv <weights.tsv
HdfsCLI
exits with return status
1 if an
error occurred and 0 otherwise.
|
要使用hdfscli,首先需要设置hdfscli的默认配置文件
|
1
2
3
4
五
6
7
|
#
cat ~/.hdfscli.cfg
[global]
default.alias
= dev
[dev.alias]
url
= http://hadoop:50070
user
= root
|
蟒蛇可用的客户端类:
InsecureClient(默认)
TokenClient
上传或下载文件
使用hdfscli上传文件或文件夹(将hadoop的文件夹上传到/ HDFS)
#hdfscli upload --alias = dev -f /hadoop-2.4.1/etc/hadoop/ / hdfs
使用hdfscli下载/日志目录到操作系统的/根/测试目录下
#hdfscli下载/ logs / root / test /
hdfscli交互模式
|
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
|
[[email protected]
~]# hdfscli --alias=dev
Welcome
to the interactive HDFS python shell.
The
HDFS client is available as `CLIENT`.
>>>
CLIENT.list("/")
[u'Demo',
u'hdfs',
u'logs',
u'logss']
>>>
CLIENT.status("/Demo")
{u'group':
u'supergroup',
u'permission':
u'755',
u'blockSize':
0,
u'accessTime':
0, u'pathSuffix':
u'',
u'modificationTime':
1495123035501L,
u'replication':
0, u'length':
0, u'childrenNum':
1, u'owner':
u'root',
u'type':
u'DIRECTORY',
u'fileId':
16389}
>>>
CLIENT.delete("logs/install.log")
False
>>>
CLIENT.delete("/logs/install.log")
True
|
与python接口的绑定
初始化客户端
1,导入客户端类,然后调用它的构造函数
|
1
2
3
4
|
>>> from hdfs
import InsecureClient
>>>
client = InsecureClient("http://172.10.236.21:50070",user='ann')
>>>
client.list("/")
[u'Demo',
u'hdfs',
u'logs',
u'logss']
|
2,导入配置类,加载一个已存在的配置文件并且从已存在的别名创建一个客户端,配置文件默认的读取文件为〜/ .hdfs_config.cfg
|
1
2
3
4
|
>>> from hdfs
import Config
>>>
client=Config().get_client("dev")
>>>
client.list("/")
[u'Demo',
u'hdfs',
u'logs',
u'logss']
|
读文件
读()方法可从HDFS系统读取一个文件,但是它必须放在与块中,以确保每次都能正确关闭连接
|
1
2
3
4
|
>>>
with client.read("/logs/yarn-env.sh",encoding="utf-8") as reader:
...
features=reader.read()
...
>>>
print features
|
CHUNK_SIZE参数将返回一个生成器,它使文件的内容变成流数据
|
1
2
3
4
|
>>>
with client.read("/logs/yarn-env.sh",chunk_size=1024) as reader:
... for chunk in reader:
...
print chunk
...
|
分隔符参数同样返回一个生成器,文件内容是被指定符号分隔的
|
1
2
3
4
|
>>>
with client.read("/logs/yarn-env.sh",
encoding="utf-8",
delimiter="\n") as reader:
... for line in reader:
...
time.sleep(1)
...
print line
|
写文件
写方法用于写文件到HDFS(将本地文件kong.txt写入HDFS的/logs/kongtest.txt文件中)
|
1
2
3
4
|
>>>
with open("/root/test/kong.txt") as reader,
client.write("/logs/kongtest.txt") as writer:
... for line in reader:
... if line.startswith("-"):
...
writer.write(line)
|
安装HDFS包
点安装HDFS:可以通过命令pip install hdfs进行安装。
查看HDFS目录
|
1
2
3
4
五
6
|
[[email protected]
hadoop]# hdfs dfs -ls -R /
drwxr-xr-x
- root supergroup 0 2017-05-18 23:57 /Demo
-rw-r--r--
1 root supergroup 3494 2017-05-18 23:57 /Demo/hadoop-env.sh
drwxr-xr-x
- root supergroup 0 2017-05-18 19:01 /logs
-rw-r--r--
1 root supergroup 2223 2017-05-18 19:01 /logs/anaconda-ks.cfg
-rw-r--r--
1 root supergroup 57162 2017-05-18 18:32 /logs/install.log
|
创建HDFS连接实例
|
1
2
3
4
五
6
|
#!/usr/bin/env
python
#
-*- coding:utf-8 -*-
__Author__
= 'kongZhaGen'
import
hdfs
client
= hdfs.Client("http://172.10.236.21:50070")
|
清单:返回远程文件夹包含的文件或目录名称,如果路径不存在则抛出错误。
hdfs_path:远程文件夹的路径
状态:同时返回每个文件的状态信息
|
1
2
3
4
五
6
7
8
|
def
list(self, hdfs_path, status=False):
"""Return
names of files contained in a
remote folder.
:param
hdfs_path: Remote path to a directory. If `hdfs_path` doesn't exist
or
points to a normal file, an :class:`HdfsError`
will be raised.
:param
status: Also return each
file's corresponding FileStatus_.
"""
|
示例:
|
1
2
3
|
print
client.list("/",status=False)
结果:
[u'Demo',
u'logs']
|
状态:获取HDFS系统上文件或文件夹的状态信息
hdfs_path:路径名称
严格:
假:如果远程路径不存在返回无
真:如果远程路径不存在抛出异常
|
1
2
3
4
五
6
7
8
9
10
11
|
def
status(self, hdfs_path, strict=True):
"""Get
FileStatus_ for a
file or folder on HDFS.
:param
hdfs_path: Remote path.
:param
strict: If `False`, return `None`
rather than raise an exception if
the
path doesn't exist.
..
_FileStatus: FS_
..
_FS: http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#FileStatus
"""
|
示例:
|
1
2
3
|
print
client.status(hdfs_path="/Demoo",strict=False)
结果:
None
|
makedirs:在HDFS上创建目录,可实现递归创建目录
hdfs_path:远程目录名称
许可:为新创建的目录设置权限
|
1
2
3
4
五
6
7
8
9
10
11
12
13
|
def
makedirs(self, hdfs_path, permission=None):
"""Create
a remote directory, recursively if necessary.
:param
hdfs_path: Remote path. Intermediate directories will be created
appropriately.
:param
permission: Octal permission to set on the
newly created directory.
These
permissions will only be set on directories
that do not
already
exist.
This
function currently has no return value as WebHDFS
doesn't return a
meaningful
flag.
"""
|
示例:
如果想在远程客户端通过脚本给HDFS创建目录,需要修改HDFS-site.xml中中中中
|
1
2
3
4
|
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
|
重启HDFS
|
1
2
|
stop-dfs.sh
start-dfs.sh
|
递归创建目录
|
1
|
client.makedirs("/data/rar/tmp",permission=755)
|
重命名:移动一个文件或文件夹
hdfs_src_path:源路径
hdfs_dst_path:目标路径,如果路径存在且是个目录,则源目录移动到此目录中如果路径存在且是个文件,则会抛出异常
|
1
2
3
4
五
6
7
8
9
10
|
def
rename(self, hdfs_src_path, hdfs_dst_path):
"""Move
a file or folder.
:param
hdfs_src_path: Source path.
:param
hdfs_dst_path: Destination path. If the path already exists and is
a
directory, the source will be moved into it.
If the path exists and is
a
file, or if a
parent destination directory is missing, this method
will
raise
an :class:`HdfsError`.
"""
|
示例:
|
1
|
client.rename("/SRC_DATA","/dest_data")
|
删除:从HDFS删除一个文件或目录
hdfs_path:HDFS系统上的路径
递归:如果目录非空,真:可递归删除.FALSE:抛出异常。
|
1
2
3
4
五
6
7
8
9
10
11
12
|
def
delete(self, hdfs_path, recursive=False):
"""Remove
a file or directory from HDFS.
:param
hdfs_path: HDFS path.
:param
recursive: Recursively delete files and directories. By default,
this method
will raise an :class:`HdfsError` if trying
to delete a
non-empty
directory.
This
function returns `True` if the
deletion was successful and `False` if
no
file or directory previously existed at `hdfs_path`.
"""
|
示例:
|
1
|
client.delete("/dest_data",recursive=True)
|
上传:上传文件或目录到HDFS文件系统,如果目标目录已经存在,则将文件或目录上传到此目录中,否则新建目录。
|
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
def
upload(self, hdfs_path, local_path, overwrite=False, n_threads=1,
temp_dir=None,
chunk_size=2 ** 16, progress=None, cleanup=True, **kwargs):
"""Upload
a file or directory to HDFS.
:param
hdfs_path: Target HDFS path. If it already exists and is a
directory,
files will be uploaded inside.
:param
local_path: Local path to file or folder. If a folder, all the files
inside
of it will be uploaded (note that this implies
that folders empty
of
files will not be created remotely).
:param
overwrite: Overwrite any existing file or directory.
:param
n_threads: Number of threads to use for parallelization.
A value of
`0`
(or negative) uses as many
threads as there
are files.
:param
temp_dir: Directory under which the files will first be uploaded
when
`overwrite=True` and the final remote path already exists. Once the
upload
successfully completes, it will be swapped in.
:param
chunk_size: Interval in bytes by which
the files will be uploaded.
:param
progress: Callback function to track progress, called every
`chunk_size`
bytes. It will be passed two arguments, the path to the
file
being uploaded and the number of bytes transferred so far. On
completion,
it will be called once with `-1` as second
argument.
:param
cleanup: Delete any uploaded files if an
error occurs during the
upload.
:param
\*\*kwargs: Keyword arguments forwarded to :meth:`write`.
On
success, this method
returns the remote upload path.
"""
|
示例:
|
1
2
3
4
五
6
|
>>>
import hdfs
>>>
client=hdfs.Client("http://172.10.236.21:50070")
>>>
client.upload("/logs","/root/training/jdk-7u75-linux-i586.tar.gz")
'/logs/jdk-7u75-linux-i586.tar.gz'
>>>
client.list("/logs")
[u'anaconda-ks.cfg',
u'install.log',
u'jdk-7u75-linux-i586.tar.gz']
|
内容:获取HDFS系统上文件或目录的概要信息
|
1
2
3
|
print
client.content("/logs/install.log")
结果:
{u'spaceConsumed':
57162, u'quota':
-1, u'spaceQuota':
-1, u'length':
57162, u'directoryCount':
0, u'fileCount':
1}
|
写:在HDFS文件系统上创建文件,可以是字符串,生成器或文件对象
|
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
def
write(self, hdfs_path, data=None, overwrite=False, permission=None,
blocksize=None,
replication=None, buffersize=None, append=False,
encoding=None):
"""Create
a file on HDFS.
:param
hdfs_path: Path where to
create file. The necessary directories will
be
created appropriately.
:param
data: Contents of file to write. Can be a string,
a generator or a
file object.
The last two options will allow streaming upload (i.e.
without
having to load the entire contents into memory).
If `None`, this
method
will return a
file-like object and
should be called using a
`with`
block
(see below for examples).
:param
overwrite: Overwrite any existing file or directory.
:param
permission: Octal permission to set on the
newly created file.
Leading
zeros may be omitted.
:param
blocksize: Block size of the file.
:param
replication: Number of replications of the file.
:param
buffersize: Size of upload buffer.
:param
append: Append to a file rather than create a new one.
:param
encoding: Encoding used to serialize data written.
"""
|