Source code for jnpr.junos.exception

import re
from jnpr.junos import jxml
from jnpr.junos import jxml as JXML
from lxml.etree import _Element
from ncclient.operations.rpc import RPCError

[docs]class FactLoopError(RuntimeError): """ Generated when there is a loop in fact gathering. """ pass
[docs]class RpcError(Exception): """ Parent class for all junos-pyez RPC Exceptions """
[docs] def __init__(self, cmd=None, rsp=None, errs=None, dev=None, timeout=None, re=None): """ :cmd: is the rpc command :rsp: is the rpc response (after <rpc-reply>) :errs: is a list of dictionaries of extracted <rpc-error> elements. :dev: is the device rpc was executed on :timeout: is the timeout value of the device :re: is the RE or member exception occured on """ self.cmd = cmd self.rsp = rsp = dev self.timeout = timeout = re self.rpc_error = None self.xml = rsp # To handle errors coming from ncclient, Here errs is list of RPCError if isinstance(errs, RPCError) and hasattr(errs, "errors"): self.errs = [JXML.rpc_error(error.xml) for error in errs.errors] for error in errs.errors: if error.severity == "error": self.rsp = JXML.remove_namespaces(error.xml) break else: if errs.severity == "warning": for error in errs.errors: if error.severity == "warning": self.rsp = JXML.remove_namespaces(error.xml) break self.message = errs.message else: self.errs = errs self.message = ( "\n".join( [ "%s: %s" % (err["severity"].strip(), err["message"].strip()) for err in errs if err["message"] is not None and err["severity"] is not None ] ) if isinstance(errs, list) else "" ) if isinstance(self.rsp, _Element): self.rpc_error = jxml.rpc_error(self.rsp) self.message = self.message or self.rpc_error["message"] if self.errs is None or not isinstance(self.errs, list): self.errs = [self.rpc_error]
def __repr__(self): """ pprints the response XML attribute """ if self.rpc_error is not None: return "{}(severity: {}, bad_element: {}, message: {})".format( self.__class__.__name__, self.rpc_error["severity"], self.rpc_error["bad_element"], self.message, ) else: return self.__class__.__name__ __str__ = __repr__
[docs]class CommitError(RpcError): """ Generated in response to a commit-check or a commit action. """
[docs] def __init__(self, rsp, cmd=None, errs=None): RpcError.__init__(self, cmd, rsp, errs)
def __repr__(self): return "{}(edit_path: {}, bad_element: {}, message: {})".format( self.__class__.__name__, self.rpc_error["edit_path"], self.rpc_error["bad_element"], self.message, ) __str__ = __repr__
[docs]class ConfigLoadError(RpcError): """ Generated in response to a failure when loading a configuration. """
[docs] def __init__(self, rsp, cmd=None, errs=None): RpcError.__init__(self, cmd, rsp, errs)
def __repr__(self): return "{}(severity: {}, bad_element: {}, message: {})".format( self.__class__.__name__, self.rpc_error["severity"], self.rpc_error["bad_element"], self.message, ) __str__ = __repr__
[docs]class LockError(RpcError): """ Generated in response to attempting to take an exclusive lock on the configuration database. """
[docs] def __init__(self, rsp): RpcError.__init__(self, rsp=rsp)
[docs]class UnlockError(RpcError): """ Generated in response to attempting to unlock the configuration database. """
[docs] def __init__(self, rsp): RpcError.__init__(self, rsp=rsp)
[docs]class PermissionError(RpcError): """ Generated in response to invoking an RPC for which the auth user does not have user-class permissions. PermissionError.message gives you the specific RPC that cause the exceptions """
[docs] def __init__(self, rsp, cmd=None, errs=None): RpcError.__init__(self, cmd=cmd, rsp=rsp, errs=errs) self.message = rsp.findtext(".//bad-element")
[docs]class RpcTimeoutError(RpcError): """ Generated in response to a RPC execution timeout. """
[docs] def __init__(self, dev, cmd, timeout): RpcError.__init__(self, dev=dev, cmd=cmd, timeout=timeout)
def __repr__(self): return "{}(host: {}, cmd: {}, timeout: {})".format( self.__class__.__name__,, self.cmd, self.timeout ) __str__ = __repr__
[docs]class SwRollbackError(RpcError): """ Generated in response to a SW rollback error. """
[docs] def __init__(self, rsp, re=None): RpcError.__init__(self, re=re, rsp=rsp)
def __repr__(self): if return "{}(re: {}, output: {})".format( self.__class__.__name__,, self.rsp ) else: return "{}(output: {})".format(self.__class__.__name__, self.rsp) __str__ = __repr__
# ================================================================ # ================================================================ # Connection Exceptions # ================================================================ # ================================================================
[docs]class ConnectError(Exception): """ Parent class for all connection related exceptions """
[docs] def __init__(self, dev, msg=None): = dev self._orig = msg
@property def user(self): """login user-name""" return @property def host(self): """login host name/ipaddr""" return @property def port(self): """login SSH port""" return @property def msg(self): """login SSH port""" return self._orig def __repr__(self): if self._orig: return "{}(host: {}, msg: {})".format( self.__class__.__name__,, self._orig ) else: return "{}({})".format(self.__class__.__name__, __str__ = __repr__
[docs]class ProbeError(ConnectError): """ Generated if auto_probe is enabled and the probe action fails """ pass
[docs]class ConnectAuthError(ConnectError): """ Generated if the user-name, password is invalid """ pass
[docs]class ConnectTimeoutError(ConnectError): """ Generated if the NETCONF session fails to connect, could be due to the fact the device is not ip reachable; bad ipaddr or just due to routing """ pass
[docs]class ConnectUnknownHostError(ConnectError): """ Generated if the specific hostname does not DNS resolve """ pass
[docs]class ConnectRefusedError(ConnectError): """ Generated if the specified host denies the NETCONF; could be that the services is not enabled, or the host has too many connections already. """ pass
[docs]class ConnectNotMasterError(ConnectError): """ Generated if the connection is made to a non-master routing-engine. This could be a backup RE on an MX device, or a virtual-chassis member (linecard), for example """ pass
[docs]class ConnectClosedError(ConnectError): """ Generated if connection unexpectedly closed """
[docs] def __init__(self, dev): ConnectError.__init__(self, dev=dev) dev.connected = False
[docs]class JSONLoadError(Exception): """ Generated if json content of rpc reply fails to load """
[docs] def __init__(self, exception, rpc_content): self.ex_msg = str(exception) self.rpc_content = rpc_content self.offending_line = "" obj ="line (\d+)", self.ex_msg) if obj: line_no = int( rpc_lines = rpc_content.splitlines() for line in range(line_no - 3, line_no + 2): self.offending_line += "%s: %s\n" % (line + 1, rpc_lines[line])
def __repr__(self): if self.offending_line: return ( "{}(reason: {}, \nThe offending config appears " "to be: \n{})".format( self.__class__.__name__, self.ex_msg, self.offending_line ) ) else: return "{}(reason: {})".format(self.__class__.__name__, self.ex_msg) __str__ = __repr__