Source code for jnpr.junos.factory.view

import warnings
from contextlib import contextmanager
from copy import deepcopy
from lxml import etree
import json
import sys

from jnpr.junos.factory.viewfields import ViewFields
from jnpr.junos.factory.to_json import TableViewJSONEncoder


[docs]class View(object): """ View is the base-class that makes extracting values from XML data appear as objects with attributes. """ ITEM_NAME_XPATH = 'name' FIELDS = {} GROUPS = None # ------------------------------------------------------------------------- # CONSTRUCTOR # -------------------------------------------------------------------------
[docs] def __init__(self, table, view_xml): """ :table: instance of the RunstatTable :view_xml: this should be an lxml etree Elemenet object. This constructor also accepts a list with a single item/XML """ # if as_xml is passed as a list, make sure it only has # a single item, common response from an xpath search if isinstance(view_xml, list): if 1 == len(view_xml): view_xml = view_xml[0] else: raise ValueError("constructor only accepts a single item") # now ensure that the thing provided is an lxml etree Element if not isinstance(view_xml, etree._Element): raise ValueError("constructor only accecpts lxml.etree._Element") self._table = table self.ITEM_NAME_XPATH = table.ITEM_NAME_XPATH self._init_xml(view_xml)
def _init_xml(self, given_xml): self._xml = given_xml if self.GROUPS is not None: self._groups = {} for xg_name, xg_xpath in self.GROUPS.items(): xg_xml = self._xml.xpath(xg_xpath) # @@@ this is technically an error; need to trap it if not len(xg_xml): continue self._groups[xg_name] = xg_xml[0] # ------------------------------------------------------------------------- # PROPERTIES # ------------------------------------------------------------------------- @property def T(self): """ return the Table instance for the View """ return self._table @property def D(self): """ return the Device instance for this View """ return self.T.D @property def name(self): """ return the name of view item """ if self.ITEM_NAME_XPATH is None: return self._table.D.hostname if isinstance(self.ITEM_NAME_XPATH, str): # xpath union key if ' | ' in self.ITEM_NAME_XPATH: return self._xml.xpath(self.ITEM_NAME_XPATH)[0].text.strip() # simple key return self._xml.findtext(self.ITEM_NAME_XPATH).strip() else: # composite key # keys with missing XPATH nodes are set to None keys = [] for i in self.ITEM_NAME_XPATH: try: keys.append(self.xml.xpath(i)[0].text.strip()) except: keys.append(None) return tuple(keys) # ALIAS key <=> name key = name @property def xml(self): """ returns the XML associated to the item """ return self._xml # ------------------------------------------------------------------------- # METHODS # -------------------------------------------------------------------------
[docs] def keys(self): """ list of view keys, i.e. field names """ return self.FIELDS.keys()
[docs] def values(self): """ list of view values """ return [getattr(self, field) for field in self.keys()]
[docs] def items(self): """ list of tuple(key,value) """ return zip(self.keys(), self.values())
def _updater_instance(self, more): """ called from extend """ if hasattr(more, 'fields'): self.FIELDS = deepcopy(self.__class__.FIELDS) self.FIELDS.update(more.fields.end) if hasattr(more, 'groups'): self.GROUPS = deepcopy(self.__class__.GROUPS) self.GROUPS.update(more.groups) def _updater_class(self, more): """ called from extend """ if hasattr(more, 'fields'): self.FIELDS.update(more.fields.end) if hasattr(more, 'groups'): self.GROUPS.update(more.groups) @contextmanager
[docs] def updater(self, fields=True, groups=False, all=True, **kvargs): """ provide the ability for subclassing objects to extend the definitions of the fields. this is implemented as a context manager with the form called from the subclass constructor: with self.extend() as more: more.fields = <dict> more.groups = <dict> # optional """ # --------------------------------------------------------------------- # create a new object class so we can attach stuff to it arbitrarily. # then pass that object to the caller, yo! # --------------------------------------------------------------------- more = type('RunstatViewMore', (object,), {})() if fields is True: more.fields = ViewFields() # --------------------------------------------------------------------- # callback through context manager # --------------------------------------------------------------------- yield more updater = self._updater_class if all is True else \ self._updater_instance updater(more)
[docs] def asview(self, view_cls): """ create a new View object for this item """ return view_cls(self._table, self._xml)
[docs] def refresh(self): """ ~~~ EXPERIMENTAL ~~~ refresh the data from the Junos device. this only works if the table provides an "args_key", does not update the original table, just this specific view/item """ warnings.warn("Experimental method: refresh") if self._table.can_refresh is not True: raise RuntimeError("table does not support this feature") # create a new table instance that gets only the specific named # value of this view tbl_xml = self._table._rpc_get(self.name) new_xml = tbl_xml.xpath(self._table.ITEM_XPATH)[0] self._init_xml(new_xml) return self
[docs] def to_json(self): """ :returns: JSON encoded string of entire View contents """ return json.dumps(self, cls=TableViewJSONEncoder)
# ------------------------------------------------------------------------- # OVERLOADS # ------------------------------------------------------------------------- def __repr__(self): """ returns the name of the View with the associate item name """ return "%s:%s" % (self.__class__.__name__, self.name) def __getattr__(self, name): """ returns a view item value, called as :obj.name: """ item = self.FIELDS.get(name) if item is None: raise ValueError("Unknown field: '%s'" % name) if 'table' in item: # if this is a sub-table, then return that now return item['table'](self.D, self._xml) # otherwise, not a sub-table, and handle the field astype = item.get('astype', str) if 'group' in item: if item['group'] in self._groups: found = self._groups[item['group']].xpath(item['xpath']) else: return else: found = self._xml.xpath(item['xpath']) len_found = len(found) if astype is bool: # handle the boolean flag case separately return bool(len_found) if not len_found: # even for the case of numbers, do not set the value. we # want to detect "does not exist" vs. defaulting to 0 # -- 2013-nov-19, JLS. return None try: # added exception handler to catch malformed xpath expressesion # -- 2013-nov-19, JLS. # added support to handle multiple xpath values, i.e. a list of # things that have the same xpath expression (common in configs) # -- 2031-dec-06, JLS # added support to use the element tag if the text is empty def _munch(x): if sys.version < '3': as_str = x if isinstance(x, str) else x.text if isinstance(as_str, unicode): as_str = as_str.encode('ascii', 'replace') else: as_str = x if isinstance(x, str) else x.text if as_str is not None: as_str = as_str.strip() if not as_str: as_str = x.tag # use 'not' to test for empty return astype(as_str) if 1 == len_found: return _munch(found[0]) return [_munch(this) for this in found] except: raise RuntimeError("Unable to handle field:'%s'" % name) # and if we are here, then we didn't handle the field. raise RuntimeError("Unable to handle field:'%s'" % name) def __getitem__(self, name): """ allow the caller to extract field values using :obj['name']: the same way they would do :obj.name: """ return getattr(self, name)