Source code for jnpr.junos.factory.view

import warnings
from contextlib import contextmanager
from copy import deepcopy
from lxml import etree
import json
import sys
from jinja2 import Template, meta

from jnpr.junos.factory.viewfields import ViewFields
from jnpr.junos.factory.to_json import TableViewJSONEncoder


[docs]class View(object): """ View is the base-class that makes extracting values from XML data appear as objects with attributes. """ ITEM_NAME_XPATH = "name" FIELDS = {} EVAL = {} GROUPS = None # ------------------------------------------------------------------------- # CONSTRUCTOR # -------------------------------------------------------------------------
[docs] def __init__(self, table, view_xml): """ :table: instance of the RunstatTable :view_xml: this should be an lxml etree Elemenet object. This constructor also accepts a list with a single item/XML """ # if as_xml is passed as a list, make sure it only has # a single item, common response from an xpath search if isinstance(view_xml, list): if 1 == len(view_xml): view_xml = view_xml[0] else: raise ValueError("constructor only accepts a single item") # now ensure that the thing provided is an lxml etree Element if not isinstance(view_xml, etree._Element): raise ValueError("constructor only accecpts lxml.etree._Element") self._table = table self.ITEM_NAME_XPATH = table.ITEM_NAME_XPATH self._init_xml(view_xml)
def _init_xml(self, given_xml): self._xml = given_xml if self.GROUPS is not None: self._groups = {} for xg_name, xg_xpath in self.GROUPS.items(): xg_xml = self._xml.xpath(xg_xpath) # @@@ this is technically an error; need to trap it if not len(xg_xml): continue self._groups[xg_name] = xg_xml[0] # ------------------------------------------------------------------------- # PROPERTIES # ------------------------------------------------------------------------- @property def T(self): """return the Table instance for the View""" return self._table @property def D(self): """return the Device instance for this View""" return self.T.D @property def name(self): """return the name of view item""" if self.ITEM_NAME_XPATH is None: return [] # return self._table.D.hostname if isinstance(self.ITEM_NAME_XPATH, str): # xpath union key if " | " in self.ITEM_NAME_XPATH: if "Null" in self.ITEM_NAME_XPATH: return self._check_key_delimiter_null( self._xml, self.ITEM_NAME_XPATH ) return self._xml.xpath(self.ITEM_NAME_XPATH)[0].text.strip() # simple key return self._xml.findtext(self.ITEM_NAME_XPATH).strip() else: # composite key # keys with missing XPATH nodes are set to None keys = [] for item_name_xpath in self.ITEM_NAME_XPATH: if " | " in item_name_xpath: key_with_null_cleaned = self._check_key_delimiter_null( self._xml, item_name_xpath ) if key_with_null_cleaned: keys.append(key_with_null_cleaned) else: try: keys.append(self.xml.xpath(item_name_xpath)[0].text.strip()) except: keys.append(None) if keys: return tuple(keys) else: return keys # ALIAS key <=> name key = name def _check_key_delimiter_null(self, xml, item_name_xpath): """ Case where key is provided like key: re-name | Null :param xml: xml object retrieved from device :param item_name_xpath: key xpath :return: key if fetched else [] """ if "Null" in item_name_xpath: # Let try get value for valid xpath key xpath_key = [x for x in item_name_xpath.split(" | ") if x != "Null"] if xpath_key: val = xml.xpath(xpath_key[0]) if val: return val[0].text.strip() else: # To handle Null key return [] @property def xml(self): """returns the XML associated to the item""" return self._xml # ------------------------------------------------------------------------- # METHODS # -------------------------------------------------------------------------
[docs] def keys(self): """list of view keys, i.e. field names""" return self.FIELDS.keys()
[docs] def values(self): """list of view values""" return [getattr(self, field) for field in self.keys()]
[docs] def items(self): """list of tuple(key,value)""" return zip(self.keys(), self.values())
def _updater_instance(self, more): """called from extend""" if hasattr(more, "fields"): self.FIELDS = deepcopy(self.__class__.FIELDS) self.FIELDS.update(more.fields.end) if hasattr(more, "groups"): self.GROUPS = deepcopy(self.__class__.GROUPS) self.GROUPS.update(more.groups) if hasattr(more, "eval"): self.EVAL = deepcopy(self.__class__.EVAL) self.EVAL.update(more.eval) def _updater_class(self, more): """called from extend""" if hasattr(more, "fields"): self.FIELDS.update(more.fields.end) if hasattr(more, "groups"): self.GROUPS.update(more.groups) if hasattr(more, "eval"): self.EVAL.update(more.eval)
[docs] @contextmanager def updater(self, fields=True, groups=False, all=True, **kvargs): """ provide the ability for subclassing objects to extend the definitions of the fields. this is implemented as a context manager with the form called from the subclass constructor: with self.extend() as more: more.fields = <dict> more.groups = <dict> # optional """ # --------------------------------------------------------------------- # create a new object class so we can attach stuff to it arbitrarily. # then pass that object to the caller, yo! # --------------------------------------------------------------------- more = type("RunstatViewMore", (object,), {})() if fields is True: more.fields = ViewFields() # --------------------------------------------------------------------- # callback through context manager # --------------------------------------------------------------------- yield more updater = self._updater_class if all is True else self._updater_instance updater(more)
[docs] def asview(self, view_cls): """create a new View object for this item""" return view_cls(self._table, self._xml)
[docs] def refresh(self): """ ~~~ EXPERIMENTAL ~~~ refresh the data from the Junos device. this only works if the table provides an "args_key", does not update the original table, just this specific view/item """ warnings.warn("Experimental method: refresh") if self._table.can_refresh is not True: raise RuntimeError("table does not support this feature") # create a new table instance that gets only the specific named # value of this view tbl_xml = self._table._rpc_get(self.name) new_xml = tbl_xml.xpath(self._table.ITEM_XPATH)[0] self._init_xml(new_xml) return self
[docs] def to_json(self): """ :returns: JSON encoded string of entire View contents """ return json.dumps(self, cls=TableViewJSONEncoder)
# ------------------------------------------------------------------------- # OVERLOADS # ------------------------------------------------------------------------- def __repr__(self): """returns the name of the View with the associate item name""" return "%s:%s" % (self.__class__.__name__, self.name) def __getattr__(self, name): """ returns a view item value, called as :obj.name: """ expression = self.EVAL.get(name) if expression: variables = meta.find_undeclared_variables(expression) t = Template(expression) expression = t.render({k: self.__getitem__(k) for k in variables}) val = eval(expression) setattr(self, name, val) return val item = self.FIELDS.get(name) if item is None: raise ValueError("Unknown field: '%s'" % name) if "table" in item: # if this is a sub-table, then return that now return item["table"](self.D, self._xml) # otherwise, not a sub-table, and handle the field astype = item.get("astype", str) if "group" in item: if item["group"] in self._groups: found = self._groups[item["group"]].xpath(item["xpath"]) else: return else: found = self._xml.xpath(item["xpath"]) len_found = len(found) if astype is bool: # handle the boolean flag case separately return bool(len_found) if not len_found: # even for the case of numbers, do not set the value. we # want to detect "does not exist" vs. defaulting to 0 # -- 2013-nov-19, JLS. return None try: # added exception handler to catch malformed xpath expressesion # -- 2013-nov-19, JLS. # added support to handle multiple xpath values, i.e. a list of # things that have the same xpath expression (common in configs) # -- 2031-dec-06, JLS # added support to use the element tag if the text is empty def _munch(x): if sys.version < "3": as_str = x if isinstance(x, str) else x.text if isinstance(as_str, unicode): as_str = as_str.encode("ascii", "replace") else: as_str = x if isinstance(x, str) else x.text if as_str is not None: as_str = as_str.strip() if not as_str: as_str = x.tag # use 'not' to test for empty return astype(as_str) if 1 == len_found: return _munch(found[0]) # -- 2020-March-26, if string function (like string-before or string-after) is used as xpath (instead of as xpath condition), lxml will return ElementUnicodeResult object, which will be converted wrongly by the next interation, we should return the original UnicodeResult if isinstance(found, etree._ElementUnicodeResult): return found return [_munch(this) for this in found] except: raise RuntimeError("Unable to handle field:'%s'" % name) # and if we are here, then we didn't handle the field. raise RuntimeError("Unable to handle field:'%s'" % name) def __getitem__(self, name): """ allow the caller to extract field values using :obj['name']: the same way they would do :obj.name: """ return getattr(self, name)