from lxml.builder import E
from jnpr.junos.utils.util import Util
from jnpr.junos.utils.start_shell import StartShell
from jnpr.junos.exception import RpcError
[docs]class FS(Util):
"""
Filesystem (FS) utilities:
* :meth:`cat`: show the contents of a file
* :meth:`checksum`: calculate file checksum (md5,sha256,sha1)
* :meth:`cp`: local file copy (not scp)
* :meth:`cwd`: change working directory
* :meth:`ls`: return file/dir listing
* :meth:`mkdir`: create a directory
* :meth:`pwd`: get working directory
* :meth:`mv`: local file rename
* :meth:`rm`: local file delete
* :meth:`rmdir`: remove a directory
* :meth:`stat`: return file/dir information
* :meth:`storage_usage`: return storage usage
* :meth:`directory_usage`: return directory usage
* :meth:`storage_cleanup`: perform storage storage_cleanup
* :meth:`storage_cleanup_check`: returns a list of files which will be
removed at cleanup
* :meth:`symlink`: create a symlink
* :meth:`tgz`: tar+gzip a directory
.. note: The following methods require 'start shell' priveldges:
* mkdir
* rmdir
* symlink
"""
# -------------------------------------------------------------------------
# cat - show file contents
# -------------------------------------------------------------------------
[docs] def cat(self, path):
"""
Returns the contents of the file **path**.
:param str path: File-path
:returns: contents of the file (str) or ``None`` if file does not exist
"""
try:
rsp = self._dev.rpc.file_show(filename=path)
except:
return None
return rsp.text
# -------------------------------------------------------------------------
# cwd - change working directory
# -------------------------------------------------------------------------
[docs] def cwd(self, path):
"""
Change working directory to **path**.
:param str path: path to working directory
"""
rsp = self._dev.rpc.set_cli_working_directory(directory=path)
return rsp.findtext('working-directory')
# -------------------------------------------------------------------------
# pwd - return current working directory
# -------------------------------------------------------------------------
[docs] def pwd(self):
"""
:returns: The current working directory path (str)
"""
rsp = self._dev.rpc(E.command("show cli directory"))
return rsp.findtext('./working-directory')
# -------------------------------------------------------------------------
# checksum - compute file checksum
# -------------------------------------------------------------------------
[docs] def checksum(self, path, calc='md5'):
"""
Performs the checksum command on the given file path using the
required calculation method and returns the string value.
If the **path** is not found on the device, then ``None`` is returned.
:param str path: file-path on local device
:param str calc: checksum calculation method:
* "md5"
* "sha256"
* "sha1"
:returns: checksum value (str) or ``None`` if file not found
"""
cmd_map = {
'md5': self._dev.rpc.get_checksum_information,
'sha256': self._dev.rpc.get_sha256_checksum_information,
'sha1': self._dev.rpc.get_sha1_checksum_information
}
rpc = cmd_map.get(calc)
if rpc is None:
raise ValueError("Unknown calculation method: '%s'" % calc)
try:
rsp = rpc(path=path)
return rsp.findtext('.//checksum').strip()
except:
# the only exception is that the path is not found
return None
@classmethod
def _decode_file(cls, fileinfo):
results = {}
not_file = fileinfo.xpath('file-directory | file-symlink-target')
if len(not_file):
results['type'] = {'file-directory': 'dir',
'file-symlink-target': 'link'}[not_file[0].tag]
if 'link' == results['type']:
results['link'] = not_file[0].text.strip()
else:
results['type'] = 'file'
results['path'] = fileinfo.findtext('file-name').strip()
results['owner'] = fileinfo.findtext('file-owner').strip()
results['size'] = int(fileinfo.findtext('file-size'))
fper = fileinfo.find('file-permissions')
results['permissions'] = int(fper.text.strip())
results['permissions_text'] = fper.get('format')
fdate = fileinfo.find('file-date')
results['ts_date'] = fdate.get('format')
results['ts_epoc'] = fdate.text.strip()
return results
@classmethod
def _decode_dir(cls, dirinfo, files=None):
results = {}
results['type'] = 'dir'
results['path'] = dirinfo.get('name')
if files is None:
files = dirinfo.xpath('file-information')
results['file_count'] = len(files)
results['size'] = sum([int(f.findtext('file-size')) for f in files])
return results
# -------------------------------------------------------------------------
# stat - file information
# -------------------------------------------------------------------------
[docs] def stat(self, path):
"""
Returns a dictionary of status information on the path, or ``None``
if the path does not exist.
:param str path: file-path on local device
:returns: status information on the file
:rtype: dict
"""
rsp = self._dev.rpc.file_list(detail=True, path=path)
# if there is an output tag, then it means that the path
# was not found
if rsp.find('output') is not None:
return None
# ok, so we've either got a directory or a file at
# this point, so decode accordingly
xdir = rsp.find('directory')
if xdir.get('name'): # then this is a directory path
return FS._decode_dir(xdir)
else:
return FS._decode_file(xdir.find('file-information'))
# -------------------------------------------------------------------------
# ls - file/dir listing
# -------------------------------------------------------------------------
[docs] def ls(self, path='.', brief=False, followlink=True):
"""
File listing, returns a dict of file information. If the
path is a symlink, then by default **followlink** will
recursively call this method to obtain the symlink specific
information.
:param str path:
file-path on local device. defaults to current
working directory
:param bool brief:
when ``True`` brief amount of data
:param bool followlink:
when ``True`` (default) this method will recursively
follow the directory symlinks to gather data
:returns: dict collection of file information or ``None``
if **path** is not found
"""
rsp = self._dev.rpc.file_list(detail=True, path=path)
# if there is an output tag, then it means that the path
# was not found, and we return :None:
if rsp.find('output') is not None:
return None
xdir = rsp.find('.//directory')
# check to see if the directory element has a :name:
# attribute, and if it does not, then this is a file, and
# decode accordingly. If the file is a symlink, then we
# want to follow the symlink to get what we want.
if not xdir.get('name'):
results = FS._decode_file(xdir.find('file-information'))
link_path = results.get('link')
if not link_path: # then we are done
return results
else:
return results if followlink is False else self.ls(
path=link_path)
# if we are here, then it's a directory, include information on all
# files
files = xdir.xpath('file-information')
results = FS._decode_dir(xdir, files)
if brief is True:
results['files'] = [f.findtext('file-name').strip() for f in files]
else:
results['files'] = dict(
(f.findtext('file-name').strip(),
FS._decode_file(f)) for f in files)
return results
# -------------------------------------------------------------------------
# storage_usage - filesystem storage usage
# -------------------------------------------------------------------------
[docs] def storage_usage(self):
"""
Returns the storage usage, similar to the unix "df" command.
:returns: dict of storage usage
"""
rsp = self._dev.rpc.get_system_storage()
_name = lambda fs: fs.findtext('filesystem-name').strip()
def _decode(fs):
r = {}
r['mount'] = fs.find('mounted-on').text.strip()
tb = fs.find('total-blocks')
r['total'] = tb.get('format')
r['total_blocks'] = int(tb.text)
ub = fs.find('used-blocks')
r['used'] = ub.get('format')
r['used_blocks'] = int(ub.text)
r['used_pct'] = fs.find('used-percent').text.strip()
ab = fs.find('available-blocks')
r['avail'] = ab.get('format')
r['avail_block'] = int(ab.text)
return r
return dict((_name(fs), _decode(fs)) for fs in rsp.xpath('filesystem'))
# -------------------------------------------------------------------------
# directory_usage - filesystem directory usage
# -------------------------------------------------------------------------
[docs] def directory_usage(self, path=".", depth=0):
"""
Returns the directory usage, similar to the unix "du" command.
:returns: dict of directory usage, including subdirectories
if depth > 0
"""
BLOCK_SIZE = 512
rsp = self._dev.rpc.get_directory_usage_information(
path=path,
depth=str(depth))
result = {}
directories = rsp.findall(".//directory")
if not directories:
raise RpcError(rsp=rsp)
for directory in directories:
dir_name = directory.findtext("directory-name")
if dir_name is not None:
dir_name = dir_name.strip()
else:
raise RpcError(rsp=rsp)
used_space = directory.find('used-space')
if used_space is not None:
dir_size = used_space.text.strip()
dir_blocks = used_space.get('used-blocks')
if dir_blocks is not None:
dir_blocks = int(dir_blocks)
dir_bytes = dir_blocks * BLOCK_SIZE
result[dir_name.strip()] = {
"size": dir_size,
"blocks": dir_blocks,
"bytes": dir_bytes,
}
return result
# -------------------------------------------------------------------------
# storage_cleanup_check, storage_cleanip
# -------------------------------------------------------------------------
@classmethod
def _decode_storage_cleanup(cls, files):
_name = lambda f: f.findtext('file-name').strip()
def _decode(f):
return {
'size': int(f.findtext('size')),
'ts_date': f.findtext('date').strip()
}
# return a dict of name/decode pairs for each file
return dict((_name(f), _decode(f)) for f in files)
[docs] def storage_cleanup_check(self):
"""
Perform the 'request system storage cleanup dry-run' command
to return a ``dict`` of files/info that would be removed if
the cleanup command was executed.
:returns: dict of files that would be removed (dry-run)
"""
rsp = self._dev.rpc.request_system_storage_cleanup(dry_run=True)
files = rsp.xpath('file-list/file')
return FS._decode_storage_cleanup(files)
[docs] def storage_cleanup(self):
"""
Perform the 'request system storage cleanup' command to remove
files from the filesystem. Return a ``dict`` of file name/info
on the files that were removed.
:returns: dict on files that were removed
"""
rsp = self._dev.rpc.request_system_storage_cleanup()
files = rsp.xpath('file-list/file')
return FS._decode_storage_cleanup(files)
# -------------------------------------------------------------------------
# rm - local file delete
# -------------------------------------------------------------------------
[docs] def rm(self, path):
"""
Performs a local file delete action, per Junos CLI command
"file delete".
:returns: ``True`` when successful, ``False`` otherwise.
"""
# the return value from this RPC will return either True if the delete
# was successful, or an XML structure otherwise. So we can do a simple
# test to provide the return result to the caller.
rsp = self._dev.rpc.file_delete(path=path)
if rsp is True:
return True
else:
return False
# -------------------------------------------------------------------------
# cp - local file copy
# -------------------------------------------------------------------------
[docs] def cp(self, from_path, to_path):
"""
Perform a local file copy where **from_path** and **to_path** can be
any valid Junos path argument. Refer to the Junos "file copy" command
documentation for details.
:param str from_path: source file-path
:param str to_path: destination file-path
.. notes: Valid Junos file-path can include URL, such as ``http://``.
this is handy for copying files for webservers.
:returns: ``True`` if OK, ``False`` if file does not exist.
"""
# this RPC returns True if it is OK. If the file does not exist
# this RPC will generate an RpcError exception, so just return False
try:
self._dev.rpc.file_copy(source=from_path, destination=to_path)
except:
return False
return True
# -------------------------------------------------------------------------
# mv - local file rename
# -------------------------------------------------------------------------
[docs] def mv(self, from_path, to_path):
"""
Perform a local file rename function, same as "file rename" Junos CLI.
:returns: ``True`` if OK, ``False`` if file does not exist.
"""
rsp = self._dev.rpc.file_rename(source=from_path, destination=to_path)
if rsp is True:
return True
else:
return False
[docs] def tgz(self, from_path, tgz_path):
"""
Create a file called **tgz_path** that is the tar-gzip of the given
directory specified **from_path**.
:param str from_path: file-path to directory of files
:param str tgz_path: file-path name of tgz file to create
:returns: ``True`` if OK, error-msg (str) otherwise
"""
rsp = self._dev.rpc.file_archive(compress=True,
source=from_path,
destination=tgz_path)
# if the rsp is True, then the command executed OK.
if rsp is True:
return True
# otherwise, return the error string to the caller
return rsp.text
# -------------------------------------------------------------------------
# !!!!! methods that use SSH shell commands, require that the user
# !!!!! has 'start shell' privileges
# -------------------------------------------------------------------------
def _ssh_exec(self, command):
with StartShell(self._dev) as sh:
got = sh.run(command)
return got
[docs] def rmdir(self, path):
"""
Executes the 'rmdir' command on **path**.
.. warning:: REQUIRES SHELL PRIVILEGES
:param str path: file-path to directory
:returns: ``True`` if OK, error-message (str) otherwise
"""
results = self._ssh_exec("rmdir %s" % path)
return True if results[0] is True else ''.join(results[1][2:-1])
[docs] def mkdir(self, path):
"""
Executes the 'mkdir -p' command on **path**.
.. warning:: REQUIRES SHELL PRIVILEGES
:returns: ``True`` if OK, error-message (str) otherwise
"""
results = self._ssh_exec("mkdir -p %s" % path)
return True if results[0] is True else ''.join(results[1][2:-1])
[docs] def symlink(self, from_path, to_path):
"""
Executes the 'ln -sf **from_path** **to_path**' command.
.. warning:: REQUIRES SHELL PRIVILEGES
:returns: ``True`` if OK, or error-message (str) otherwise
"""
results = self._ssh_exec("ln -sf %s %s" % (from_path, to_path))
return True if results[0] is True else ''.join(results[1][2:-1])