Source code for jnpr.junos.utils.fs

from lxml.builder import E

from jnpr.junos.utils.util import Util
from jnpr.junos.utils.start_shell import StartShell

from jnpr.junos.exception import RpcError


[docs]class FS(Util): """ Filesystem (FS) utilities: * :meth:`cat`: show the contents of a file * :meth:`checksum`: calculate file checksum (md5,sha256,sha1) * :meth:`cp`: local file copy (not scp) * :meth:`cwd`: change working directory * :meth:`ls`: return file/dir listing * :meth:`mkdir`: create a directory * :meth:`pwd`: get working directory * :meth:`mv`: local file rename * :meth:`rm`: local file delete * :meth:`rmdir`: remove a directory * :meth:`stat`: return file/dir information * :meth:`storage_usage`: return storage usage * :meth:`directory_usage`: return directory usage * :meth:`storage_cleanup`: perform storage storage_cleanup * :meth:`storage_cleanup_check`: returns a list of files which will be removed at cleanup * :meth:`symlink`: create a symlink * :meth:`tgz`: tar+gzip a directory .. note: The following methods require 'start shell' priveldges: * mkdir * rmdir * symlink """ # ------------------------------------------------------------------------- # cat - show file contents # -------------------------------------------------------------------------
[docs] def cat(self, path): """ Returns the contents of the file **path**. :param str path: File-path :returns: contents of the file (str) or ``None`` if file does not exist """ try: rsp = self._dev.rpc.file_show(filename=path) except: return None return rsp.text
# ------------------------------------------------------------------------- # cwd - change working directory # -------------------------------------------------------------------------
[docs] def cwd(self, path): """ Change working directory to **path**. :param str path: path to working directory """ rsp = self._dev.rpc.set_cli_working_directory(directory=path) return rsp.findtext("working-directory")
# ------------------------------------------------------------------------- # pwd - return current working directory # -------------------------------------------------------------------------
[docs] def pwd(self): """ :returns: The current working directory path (str) """ rsp = self._dev.rpc(E.command("show cli directory")) return rsp.findtext("./working-directory")
# ------------------------------------------------------------------------- # checksum - compute file checksum # -------------------------------------------------------------------------
[docs] def checksum(self, path, calc="md5"): """ Performs the checksum command on the given file path using the required calculation method and returns the string value. If the **path** is not found on the device, then ``None`` is returned. :param str path: file-path on local device :param str calc: checksum calculation method: * "md5" * "sha256" * "sha1" :returns: checksum value (str) or ``None`` if file not found """ cmd_map = { "md5": self._dev.rpc.get_checksum_information, "sha256": self._dev.rpc.get_sha256_checksum_information, "sha1": self._dev.rpc.get_sha1_checksum_information, } rpc = cmd_map.get(calc) if rpc is None: raise ValueError("Unknown calculation method: '%s'" % calc) try: rsp = rpc(path=path) return rsp.findtext(".//checksum").strip() except: # the only exception is that the path is not found return None
@classmethod def _decode_file(cls, fileinfo): results = {} not_file = fileinfo.xpath("file-directory | file-symlink-target") if len(not_file): results["type"] = {"file-directory": "dir", "file-symlink-target": "link"}[ not_file[0].tag ] if "link" == results["type"]: results["link"] = not_file[0].text.strip() else: results["type"] = "file" results["path"] = fileinfo.findtext("file-name").strip() results["owner"] = fileinfo.findtext("file-owner").strip() results["size"] = int(fileinfo.findtext("file-size")) fper = fileinfo.find("file-permissions") results["permissions"] = int(fper.text.strip()) results["permissions_text"] = fper.get("format") fdate = fileinfo.find("file-date") results["ts_date"] = fdate.get("format") results["ts_epoc"] = fdate.text.strip() return results @classmethod def _decode_dir(cls, dirinfo, files=None): results = {} results["type"] = "dir" results["path"] = dirinfo.get("name") if files is None: files = dirinfo.xpath("file-information") results["file_count"] = len(files) results["size"] = sum([int(f.findtext("file-size")) for f in files]) return results # ------------------------------------------------------------------------- # stat - file information # -------------------------------------------------------------------------
[docs] def stat(self, path): """ Returns a dictionary of status information on the path, or ``None`` if the path does not exist. :param str path: file-path on local device :returns: status information on the file :rtype: dict """ rsp = self._dev.rpc.file_list(detail=True, path=path) # if there is an output tag, then it means that the path # was not found if rsp.find("output") is not None: return None # ok, so we've either got a directory or a file at # this point, so decode accordingly xdir = rsp.find("directory") if xdir.get("name"): # then this is a directory path return FS._decode_dir(xdir) else: return FS._decode_file(xdir.find("file-information"))
# ------------------------------------------------------------------------- # ls - file/dir listing # -------------------------------------------------------------------------
[docs] def ls(self, path=".", brief=False, followlink=True): """ File listing, returns a dict of file information. If the path is a symlink, then by default **followlink** will recursively call this method to obtain the symlink specific information. :param str path: file-path on local device. defaults to current working directory :param bool brief: when ``True`` brief amount of data :param bool followlink: when ``True`` (default) this method will recursively follow the directory symlinks to gather data :returns: dict collection of file information or ``None`` if **path** is not found """ rsp = self._dev.rpc.file_list(detail=True, path=path) # if there is an output tag, then it means that the path # was not found, and we return :None: if rsp.find("output") is not None: return None xdir = rsp.find(".//directory") # check to see if the directory element has a :name: # attribute, and if it does not, then this is a file, and # decode accordingly. If the file is a symlink, then we # want to follow the symlink to get what we want. if not xdir.get("name"): results = FS._decode_file(xdir.find("file-information")) link_path = results.get("link") if not link_path: # then we are done return results else: return results if followlink is False else self.ls(path=link_path) # if we are here, then it's a directory, include information on all # files files = xdir.xpath("file-information") results = FS._decode_dir(xdir, files) if brief is True: results["files"] = [f.findtext("file-name").strip() for f in files] else: results["files"] = dict( (f.findtext("file-name").strip(), FS._decode_file(f)) for f in files ) return results
# ------------------------------------------------------------------------- # storage_usage - filesystem storage usage # -------------------------------------------------------------------------
[docs] def storage_usage(self): """ Returns the storage usage, similar to the unix "df" command. :returns: dict of storage usage """ rsp = self._dev.rpc.get_system_storage() def _name(fs): return fs.findtext("filesystem-name").strip() def _decode(fs): r = {} r["mount"] = fs.find("mounted-on").text.strip() tb = fs.find("total-blocks") r["total"] = tb.get("format") r["total_blocks"] = int(tb.text) ub = fs.find("used-blocks") r["used"] = ub.get("format") r["used_blocks"] = int(ub.text) r["used_pct"] = fs.find("used-percent").text.strip() ab = fs.find("available-blocks") r["avail"] = ab.get("format") r["avail_block"] = int(ab.text) return r re_list = rsp.xpath("multi-routing-engine-item") if re_list: fs_dict = {} for re in re_list: re_name = re.findtext("re-name").strip() re_fs_dict = dict( (_name(fs), _decode(fs)) for fs in re.xpath("system-storage-information/filesystem") ) fs_dict[re_name] = re_fs_dict return fs_dict return dict((_name(fs), _decode(fs)) for fs in rsp.xpath("filesystem"))
# ------------------------------------------------------------------------- # directory_usage - filesystem directory usage # -------------------------------------------------------------------------
[docs] def directory_usage(self, path=".", depth=0): """ Returns the directory usage, similar to the unix "du" command. :returns: dict of directory usage, including subdirectories if depth > 0 """ BLOCK_SIZE = 512 rsp = self._dev.rpc.get_directory_usage_information(path=path, depth=str(depth)) result = {} directories = rsp.findall(".//directory") if not directories: raise RpcError(rsp=rsp) for directory in directories: dir_name = directory.findtext("directory-name") if dir_name is not None: dir_name = dir_name.strip() else: raise RpcError(rsp=rsp) used_space = directory.find("used-space") if used_space is not None: dir_size = used_space.text.strip() dir_blocks = used_space.get("used-blocks") if dir_blocks is not None: dir_blocks = int(dir_blocks) dir_bytes = dir_blocks * BLOCK_SIZE result[dir_name.strip()] = { "size": dir_size, "blocks": dir_blocks, "bytes": dir_bytes, } return result
# ------------------------------------------------------------------------- # storage_cleanup_check, storage_cleanip # ------------------------------------------------------------------------- @classmethod def _decode_storage_cleanup(cls, files): def _name(f): return f.findtext("file-name").strip() def _decode(f): return { "size": int(f.findtext("size")), "ts_date": f.findtext("date").strip(), } # return a dict of name/decode pairs for each file return dict((_name(f), _decode(f)) for f in files)
[docs] def storage_cleanup_check(self): """ Perform the 'request system storage cleanup dry-run' command to return a ``dict`` of files/info that would be removed if the cleanup command was executed. :returns: dict of files that would be removed (dry-run) """ rsp = self._dev.rpc.request_system_storage_cleanup(dry_run=True) files = rsp.xpath("file-list/file") return FS._decode_storage_cleanup(files)
[docs] def storage_cleanup(self): """ Perform the 'request system storage cleanup' command to remove files from the filesystem. Return a ``dict`` of file name/info on the files that were removed. :returns: dict on files that were removed """ rsp = self._dev.rpc.request_system_storage_cleanup() files = rsp.xpath("file-list/file") return FS._decode_storage_cleanup(files)
# ------------------------------------------------------------------------- # rm - local file delete # -------------------------------------------------------------------------
[docs] def rm(self, path): """ Performs a local file delete action, per Junos CLI command "file delete". :returns: ``True`` when successful, ``False`` otherwise. """ # the return value from this RPC will return either True if the delete # was successful, or an XML structure otherwise. So we can do a simple # test to provide the return result to the caller. rsp = self._dev.rpc.file_delete(path=path) if rsp is True: return True else: return False
# ------------------------------------------------------------------------- # cp - local file copy # -------------------------------------------------------------------------
[docs] def cp(self, from_path, to_path, routing_instance=None, **kvargs): """ Perform a local file copy where **from_path** and **to_path** can be any valid Junos path argument. Refer to the Junos "file copy" command documentation for details. :param str from_path: source file-path :param str to_path: destination file-path :param str routing_instance: (optional) routing_instance name :param dict kvargs: Any additional parameters to the 'file copy' command can be passed within **kvargs**, following the RPC syntax methodology (dash-2-underscore,etc.) .. notes: Valid Junos file-path can include URL, such as ``http://``. this is handy for copying files for webservers. :returns: ``True`` if OK, ``False`` if file does not exist. """ # this RPC returns True if it is OK. If the file does not exist # this RPC will generate an RpcError exception, so just return False try: if routing_instance is None: self._dev.rpc.file_copy(source=from_path, destination=to_path, **kvargs) else: self._dev.rpc.file_copy( source=from_path, destination=to_path, routing_instance=routing_instance, **kvargs ) except: return False return True
# ------------------------------------------------------------------------- # mv - local file rename # -------------------------------------------------------------------------
[docs] def mv(self, from_path, to_path): """ Perform a local file rename function, same as "file rename" Junos CLI. :returns: ``True`` if OK, ``False`` if file does not exist. """ rsp = self._dev.rpc.file_rename(source=from_path, destination=to_path) if rsp is True: return True else: return False
[docs] def tgz(self, from_path, tgz_path): """ Create a file called **tgz_path** that is the tar-gzip of the given directory specified **from_path**. :param str from_path: file-path to directory of files :param str tgz_path: file-path name of tgz file to create :returns: ``True`` if OK, error-msg (str) otherwise """ rsp = self._dev.rpc.file_archive( compress=True, source=from_path, destination=tgz_path ) # if the rsp is True, then the command executed OK. if rsp is True: return True # otherwise, return the error string to the caller return rsp.text
# ------------------------------------------------------------------------- # !!!!! methods that use SSH shell commands, require that the user # !!!!! has 'start shell' privileges # ------------------------------------------------------------------------- def _ssh_exec(self, command): with StartShell(self._dev) as sh: got = sh.run(command) return got
[docs] def rmdir(self, path): """ Executes the 'rmdir' command on **path**. .. warning:: REQUIRES SHELL PRIVILEGES :param str path: file-path to directory :returns: ``True`` if OK, error-message (str) otherwise """ results = self._ssh_exec("rmdir %s" % path) return True if results[0] is True else "".join(results[1][2:-1])
[docs] def mkdir(self, path): """ Executes the 'mkdir -p' command on **path**. .. warning:: REQUIRES SHELL PRIVILEGES :returns: ``True`` if OK, error-message (str) otherwise """ results = self._ssh_exec("mkdir -p %s" % path) return True if results[0] is True else "".join(results[1][2:-1])