Source code for jnpr.junos.device

# stdlib
import os
import types
import platform
import warnings

# stdlib, in support of the the 'probe' method
import socket
import datetime
import time
import json

# 3rd-party packages
from lxml import etree
from ncclient import manager as netconf_ssh
import ncclient.transport.errors as NcErrors
import ncclient.operations.errors as NcOpErrors
from ncclient.operations import RPCError
import paramiko
import jinja2

# local modules
from jnpr.junos.rpcmeta import _RpcMetaExec
from jnpr.junos import exception as EzErrors
from jnpr.junos.cfg import Resource
from jnpr.junos.facts import *
from jnpr.junos import jxml as JXML
from jnpr.junos.decorators import timeoutDecorator, normalizeDecorator

_MODULEPATH = os.path.dirname(__file__)


class _MyTemplateLoader(jinja2.BaseLoader):

    """
    Create a jinja2 template loader class that can be used to
    load templates from all over the filesystem, but defaults
    to the CWD and the 'templates' directory of the module
    """

    def __init__(self):
        self.paths = ['.', os.path.join(_MODULEPATH, 'templates')]

    def get_source(self, environment, template):

        def _in_path(dir):
            return os.path.exists(os.path.join(dir, template))

        path = filter(_in_path, self.paths)
        if not path:
            raise jinja2.TemplateNotFound(template)

        path = os.path.join(path[0], template)
        mtime = os.path.getmtime(path)
        with file(path) as f:
            source = f.read().decode('utf-8')
        return source, path, lambda: mtime == os.path.getmtime(path)

_Jinja2ldr = jinja2.Environment(loader=_MyTemplateLoader())


[docs]class Device(object): """ Junos Device class. :attr:`ON_JUNOS`: **READ-ONLY** - Auto-set to ``True`` when this code is running on a Junos device, vs. running on a local-server remotely connecting to a device. :attr:`auto_probe`: When non-zero the call to :meth:`open` will probe for NETCONF reachability before proceeding with the NETCONF session establishment. If you want to enable this behavior by default, you could do the following in your code:: from jnpr.junos import Device # set all device open to auto-probe with timeout of 10 sec Device.auto_probe = 10 dev = Device( ... ) dev.open() # this will probe before attempting NETCONF connect """ ON_JUNOS = platform.system().upper() == 'JUNOS' auto_probe = 0 # default is no auto-probe # ------------------------------------------------------------------------- # PROPERTIES # ------------------------------------------------------------------------- # ------------------------------------------------------------------------ # property: hostname # ------------------------------------------------------------------------ @property def hostname(self): """ :returns: the host-name of the Junos device. """ return self._hostname if ( self._hostname != 'localhost') else self.facts.get('hostname') # ------------------------------------------------------------------------ # property: user # ------------------------------------------------------------------------ @property def user(self): """ :returns: the login user (str) accessing the Junos device """ return self._auth_user # ------------------------------------------------------------------------ # property: password # ------------------------------------------------------------------------ @property def password(self): """ :returns: ``None`` - do not provide the password """ return None # read-only @password.setter def password(self, value): """ Change the authentication password value. This is handy in case the calling program needs to attempt different passwords. """ self._auth_password = value # ------------------------------------------------------------------------ # property: logfile # ------------------------------------------------------------------------ @property def logfile(self): """ :returns: exsiting logfile ``file`` object. """ return self._logfile @logfile.setter def logfile(self, value): """ Assigns an opened file object to the device for logging If there is an open logfile, and 'value' is ``None`` or ``False`` then close the existing file. :param file value: An open ``file`` object. :returns: the new logfile ``file`` object :raises ValueError: When **value** is not a ``file`` object """ # got an existing file that we need to close if (not value) and (None != self._logfile): rc = self._logfile.close() self._logfile = False return rc if not isinstance(value, file): raise ValueError("value must be a file object") self._logfile = value return self._logfile # ------------------------------------------------------------------------ # property: timeout # ------------------------------------------------------------------------ @property def timeout(self): """ :returns: the current RPC timeout value (int) in seconds. """ return self._conn.timeout @timeout.setter def timeout(self, value): """ Used to change the RPC timeout value (default=30 sec). :param int value: New timeout value in seconds """ self._conn.timeout = value # ------------------------------------------------------------------------ # property: facts # ------------------------------------------------------------------------ @property def facts(self): """ :returns: Device fact dictionary """ return self._facts @facts.setter def facts(self, value): """ read-only property """ raise RuntimeError("facts is read-only!") # ------------------------------------------------------------------------ # property: manages # ------------------------------------------------------------------------ @property def manages(self): """ :returns: ``list`` of Resource Managers/Utilities attached to this instance using the :meth:`bind` method. """ return self._manages # ------------------------------------------------------------------------ # property: transform # ------------------------------------------------------------------------ @property def transform(self): """ :returns: the current RPC XML Transformation. """ return self._conn._device_handler.transform_reply @transform.setter def transform(self, func): """ Used to change the RPC XML Transformation. :param lambda value: New transform lambda """ self._conn._device_handler.transform_reply = func # ----------------------------------------------------------------------- # OVERLOADS # ----------------------------------------------------------------------- def __repr__(self): return "Device(%s)" % self.hostname # ----------------------------------------------------------------------- # CONSTRUCTOR # ----------------------------------------------------------------------- def _sshconf_lkup(self): if self._ssh_config: sshconf_path = os.path.expanduser(self._ssh_config) else: home = os.getenv('HOME') if not home: return None sshconf_path = os.path.join(os.getenv('HOME'), '.ssh/config') if not os.path.exists(sshconf_path): return None else: sshconf = paramiko.SSHConfig() sshconf.parse(open(sshconf_path, 'r')) found = sshconf.lookup(self._hostname) self._hostname = found.get('hostname', self._hostname) self._port = found.get('port', self._port) self._conf_auth_user = found.get('user') self._conf_ssh_private_key_file = found.get('identityfile') return sshconf_path
[docs] def __init__(self, *vargs, **kvargs): """ Device object constructor. :param str vargs[0]: host-name or ipaddress. This is an alternative for **host** :param str host: **REQUIRED** host-name or ipaddress of target device :param str user: *OPTIONAL* login user-name, uses $USER if not provided :param str passwd: *OPTIONAL* if not provided, assumed ssh-keys are enforced :param int port: *OPTIONAL* NETCONF port (defaults to 830) :param bool gather_facts: *OPTIONAL* default is ``True``. If ``False`` then the facts are not gathered on call to :meth:`open` :param bool auto_probe: *OPTIONAL* if non-zero then this enables auto_probe at time of :meth:`open` and defines the amount of time(sec) for the probe timeout :param str ssh_private_key_file: *OPTIONAL* The path to the SSH private key file. This can be used if you need to provide a private key rather than loading the key into the ssh-key-ring/environment. if your ssh-key requires a password, then you must provide it via **passwd** :param str ssh_config: *OPTIONAL* The path to the SSH configuration file. This can be used to load SSH information from a configuration file. By default ~/.ssh/config is queried. :param bool normalize: *OPTIONAL* default is ``False``. If ``True`` then the XML returned by :meth:`execute` will have whitespace normalized """ # ---------------------------------------- # setup instance connection/open variables # ---------------------------------------- hostname = vargs[0] if len(vargs) else kvargs.get('host') self._port = kvargs.get('port', 830) self._gather_facts = kvargs.get('gather_facts', True) self._normalize = kvargs.get('normalize', False) self._auto_probe = kvargs.get('auto_probe', self.__class__.auto_probe) if self.__class__.ON_JUNOS is True and hostname is None: # --------------------------------- # running on a Junos device locally # --------------------------------- self._auth_user = None self._auth_password = None self._hostname = 'localhost' self._ssh_private_key_file = None self._ssh_config = None else: # -------------------------- # making a remote connection # -------------------------- if hostname is None: raise ValueError("You must provide the 'host' value") self._hostname = hostname # user will default to $USER self._auth_user = os.getenv('USER') self._conf_auth_user = None self._conf_ssh_private_key_file = None # user can get updated by ssh_config self._ssh_config = kvargs.get('ssh_config') self._sshconf_path = self._sshconf_lkup() # but if user or private key is explit from call, then use it. self._auth_user = kvargs.get('user') or self._conf_auth_user or self._auth_user self._ssh_private_key_file = kvargs.get('ssh_private_key_file') or self._conf_ssh_private_key_file self._auth_password = kvargs.get('password') or kvargs.get('passwd') # ----------------------------- # initialize instance variables # ------------------------------ self._conn = None self._j2ldr = _Jinja2ldr self._manages = [] self._facts = {} # public attributes self.connected = False self.rpc = _RpcMetaExec(self) # ----------------------------------------------------------------------- # Basic device methods # -----------------------------------------------------------------------
[docs] def open(self, *vargs, **kvargs): """ Opens a connection to the device using existing login/auth information. :param bool gather_facts: If set to ``True``/``False`` will override the device instance value for only this open process :param bool auto_probe: If non-zero then this enables auto_probe and defines the amount of time/seconds for the probe timeout :param bool normalize: If set to ``True``/``False`` will override the device instance value for only this open process :returns Device: Device instance (*self*). :raises ProbeError: When **auto_probe** is ``True`` and the probe activity exceeds the timeout :raises ConnectAuthError: When provided authentication credentials fail to login :raises ConnectRefusedError: When the device does not have NETCONF enabled :raises ConnectTimeoutError: When the the :meth:`Device.timeout` value is exceeded during the attempt to connect to the remote device :raises ConnectError: When an error, other than the above, occurs. The originating ``Exception`` is assigned as ``err._orig`` and re-raised to the caller. """ auto_probe = kvargs.get('auto_probe', self._auto_probe) if auto_probe is not 0: if not self.probe(auto_probe): raise EzErrors.ProbeError(self) try: ts_start = datetime.datetime.now() # we want to enable the ssh-agent if-and-only-if we are # not given a password or an ssh key file. # in this condition it means we want to query the agent # for available ssh keys allow_agent = bool((self._auth_password is None) and (self._ssh_private_key_file is None)) # open connection using ncclient transport self._conn = netconf_ssh.connect( host=self._hostname, port=self._port, username=self._auth_user, password=self._auth_password, hostkey_verify=False, key_filename=self._ssh_private_key_file, allow_agent=allow_agent, ssh_config=self._sshconf_lkup(), device_params={'name': 'junos'}) except NcErrors.AuthenticationError as err: # bad authentication credentials raise EzErrors.ConnectAuthError(self) except NcErrors.SSHError as err: # this is a bit of a hack for now, since we want to # know if the connection was refused or we simply could # not open a connection due to reachability. so using # a timestamp to differentiate the two conditions for now # if the diff is < 3 sec, then assume the host is # reachable, but NETCONF connection is refushed. ts_err = datetime.datetime.now() diff_ts = ts_err - ts_start if diff_ts.seconds < 3: raise EzErrors.ConnectRefusedError(self) # at this point, we assume that the connection # has timeed out due to ip-reachability issues if str(err).find('not open') > 0: raise EzErrors.ConnectTimeoutError(self) else: # otherwise raise a generic connection # error for now. tag the new exception # with the original for debug cnx = EzErrors.ConnectError(self) cnx._orig = err raise cnx except socket.gaierror: # invalid DNS name, so unreachable raise EzErrors.ConnectUnknownHostError(self) except Exception as err: # anything else, we will re-raise as a # generic ConnectError cnx_err = EzErrors.ConnectError(self) cnx_err._orig = err raise cnx_err self.connected = True self._nc_transform = self.transform self._norm_transform = lambda: JXML.normalize_xslt normalize = kvargs.get('normalize', self._normalize) if normalize is True: self.transform = self._norm_transform gather_facts = kvargs.get('gather_facts', self._gather_facts) if gather_facts is True: self.facts_refresh() return self
[docs] def close(self): """ Closes the connection to the device. """ self._conn.close_session() self.connected = False
@normalizeDecorator @timeoutDecorator
[docs] def execute(self, rpc_cmd, **kvargs): """ Executes an XML RPC and returns results as either XML or native python :param rpc_cmd: can either be an XML Element or xml-as-string. In either case the command starts with the specific command element, i.e., not the <rpc> element itself :param func to_py': Is a caller provided function that takes the response and will convert the results to native python types. all kvargs will be passed to this function as well in the form:: to_py( self, rpc_rsp, **kvargs ) :raises ValueError: When the **rpc_cmd** is of unknown origin :raises PermissionError: When the requested RPC command is not allowed due to user-auth class privilege controls on Junos :raises RpcError: When an ``rpc-error`` element is contained in the RPC-reply :returns: RPC-reply as XML object. If **to_py** is provided, then that function is called, and return of that function is provided back to the caller; presumably to convert the XML to native python data-types (e.g. ``dict``). """ if self.connected is not True: raise EzErrors.ConnectClosedError(self) if isinstance(rpc_cmd, str): rpc_cmd_e = etree.XML(rpc_cmd) elif isinstance(rpc_cmd, etree._Element): rpc_cmd_e = rpc_cmd else: raise ValueError( "Dont know what to do with rpc of type %s" % rpc_cmd.__class__.__name__) # invoking a bad RPC will cause a connection object exception # will will be raised directly to the caller ... for now ... # @@@ need to trap this and re-raise accordingly. try: rpc_rsp_e = self._conn.rpc(rpc_cmd_e)._NCElement__doc except NcOpErrors.TimeoutExpiredError: # err is a TimeoutExpiredError from ncclient, # which has no such attribute as xml. raise EzErrors.RpcTimeoutError(self, rpc_cmd_e.tag, self.timeout) except NcErrors.TransportError: raise EzErrors.ConnectClosedError(self) except RPCError as err: # err is an NCError from ncclient rsp = JXML.remove_namespaces(err.xml) # see if this is a permission error e = EzErrors.PermissionError if rsp.findtext('error-message') == 'permission denied' else EzErrors.RpcError raise e(cmd=rpc_cmd_e, rsp=rsp) # Something unexpected happened - raise it up except Exception as err: warnings.warn("An unknown exception occured - please report.", RuntimeWarning) raise # From 14.2 onward, junos supports JSON, so now code can be written as # dev.rpc.get_route_engine_information({'format': 'json'}) if rpc_cmd_e.attrib.get('format') in ['json', 'JSON']: if self._facts == {}: self.facts_refresh() ver_info = self._facts['version_info'] if ver_info.major[0] >= 15 or \ (ver_info.major[0] == 14 and ver_info.major[1] >= 2): return json.loads(rpc_rsp_e.text) else: warnings.warn("Native JSON support is only from 14.2 onwards", RuntimeWarning) # This section is here for the possible use of something other than ncclient # for RPCs that have embedded rpc-errors, need to check for those now # rpc_errs = rpc_rsp_e.xpath('.//rpc-error') # if len(rpc_errs): # raise EzErrors.RpcError(cmd=rpc_cmd_e, rsp=rpc_errs[0]) # skip the <rpc-reply> element and pass the caller first child element # generally speaking this is what they really want. If they want to # uplevel they can always call the getparent() method on it. try: ret_rpc_rsp = rpc_rsp_e[0] except IndexError: # no children, so assume it means we are OK return True # if the caller provided a "to Python" conversion function, then invoke # that now and return the results of that function. otherwise just # return the RPC results as XML if kvargs.get('to_py'): return kvargs['to_py'](self, ret_rpc_rsp, **kvargs) else: return ret_rpc_rsp # ------------------------------------------------------------------------ # cli - for cheating commands :-) # ------------------------------------------------------------------------
[docs] def cli(self, command, format='text', warning=True): """ Executes the CLI command and returns the CLI text output by default. :param str command: The CLI command to execute, e.g. "show version" :param str format: The return format, by default is text. You can optionally select "xml" to return the XML structure. .. note:: You can also use this method to obtain the XML RPC command for a given CLI command by using the pipe filter ``| display xml rpc``. When you do this, the return value is the XML RPC command. For example if you provide as the command ``show version | display xml rpc``, you will get back the XML Element ``<get-software-information>``. .. warning:: This function is provided for **DEBUG** purposes only! **DO NOT** use this method for general automation purposes as that puts you in the realm of "screen-scraping the CLI". The purpose of the PyEZ framework is to migrate away from that tooling pattern. Interaction with the device should be done via the RPC function. .. warning:: You cannot use "pipe" filters with **command** such as ``| match`` or ``| count``, etc. The only value use of the "pipe" is for the ``| display xml rpc`` as noted above. """ if 'display xml rpc' not in command and warning is True: warnings.simplefilter("always") warnings.warn("CLI command is for debug use only!", RuntimeWarning) warnings.resetwarnings() try: rsp = self.rpc.cli(command, format) if rsp.tag == 'output': return rsp.text if rsp.tag == 'configuration-information': return rsp.findtext('configuration-output') if rsp.tag == 'rpc': return rsp[0] return rsp except: return "invalid command: " + command
[docs] def display_xml_rpc(self, command, format='xml'): """ Executes the CLI command and returns the CLI text output by default. :param str command: The CLI command to retrieve XML RPC for, e.g. "show version" :param str format: The return format, by default is XML. You can optionally select "text" to return the XML structure as a string. """ try: command = command + '| display xml rpc' rsp = self.rpc.cli(command) if format == 'text': return etree.tostring(rsp[0]) return rsp[0] except: return "invalid command: " + command # ------------------------------------------------------------------------ # Template: retrieves a Jinja2 template # ------------------------------------------------------------------------
[docs] def Template(self, filename, parent=None, gvars=None): """ Used to return a Jinja2 :class:`Template`. :param str filename: file-path to Jinja2 template file on local device :returns: Jinja2 :class:`Template` give **filename**. """ return self._j2ldr.get_template(filename, parent, gvars) # ------------------------------------------------------------------------ # dealing with bind aspects # ------------------------------------------------------------------------
[docs] def bind(self, *vargs, **kvargs): """ Used to attach things to this Device instance and make them a property of the :class:Device instance. The most common use for bind is attaching Utility instances to a :class:Device. For example:: from jnpr.junos.utils.config import Config dev.bind( cu=Config ) dev.cu.lock() # ... load some changes dev.cu.commit() dev.cu.unlock() :param list vargs: A list of functions that will get bound as instance methods to this Device instance. .. warning:: Experimental. :param new_property: name/class pairs that will create resource-managers bound as instance attributes to this Device instance. See code example above """ if len(vargs): for fn in vargs: # check for name clashes before binding if hasattr(self, fn.__name__): raise ValueError( "request attribute name %s already exists" % fn.__name__) for fn in vargs: # bind as instance method, majik. self.__dict__[ fn.__name__] = types.MethodType( fn, self, self.__class__) return # first verify that the names do not conflict with # existing object attribute names for name in kvargs.keys(): # check for name-clashes before binding if hasattr(self, name): raise ValueError( "requested attribute name %s already exists" % name) # now instantiate items and bind to this :Device: for name, thing in kvargs.items(): new_inst = thing(self) self.__dict__[name] = new_inst self._manages.append(name) # ------------------------------------------------------------------------ # facts # ------------------------------------------------------------------------
[docs] def facts_refresh(self): """ Reload the facts from the Junos device into :attr:`facts` property. """ for gather in FACT_LIST: gather(self, self._facts) # ------------------------------------------------------------------------ # probe # ------------------------------------------------------------------------
[docs] def probe(self, timeout=5, intvtimeout=1): """ Probe the device to determine if the Device can accept a remote connection. This method is meant to be called *prior* to :open(): This method will not work with ssh-jumphost environments. :param int timeout: The probe will report ``True``/``False`` if the device report connectivity within this timeout (seconds) :param int intvtimeout: Timeout interval on the socket connection. Generally you should not change this value, but you can if you want to twiddle the frequency of the socket attempts on the connection :returns: ``True`` if probe is successful, ``False`` otherwise """ start = datetime.datetime.now() end = start + datetime.timedelta(seconds=timeout) probe_ok = True while datetime.datetime.now() < end: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(intvtimeout) try: s.connect((self.hostname, self._port)) s.shutdown(socket.SHUT_RDWR) s.close() break except: time.sleep(1) pass else: elapsed = datetime.datetime.now() - start probe_ok = False return probe_ok # ----------------------------------------------------------------------- # Context Manager # -----------------------------------------------------------------------
def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): if self._conn.connected and \ not isinstance(exc_val, EzErrors.ConnectError): self.close()