Source code for jnpr.junos.factory.optable

from copy import deepcopy
import logging

# 3rd-party
from lxml import etree
from lxml.builder import E

# local
from jnpr.junos.factory.table import Table
from jnpr.junos.jxml import remove_namespaces, remove_namespaces_and_spaces
from jnpr.junos.decorators import checkSAXParserDecorator

logger = logging.getLogger("jnpr.junos.factory.optable")


[docs]class OpTable(Table): # ------------------------------------------------------------------------- # PUBLIC METHODS # -------------------------------------------------------------------------
[docs] @checkSAXParserDecorator def get(self, *vargs, **kvargs): r""" Retrieve the XML table data from the Device instance and returns back the Table instance - for call-chaining purposes. If the Table was created with a :path: rather than a Device, then this method will load the XML from that file. In this case, the \*vargs, and \**kvargs are not used. ALIAS: __call__ :vargs: [0] is the table :arg_key: value. This is used so that the caller can retrieve just one item from the table without having to know the Junos RPC argument. :kvargs: these are the name/value pairs relating to the specific Junos XML command attached to the table. For example, if the RPC is 'get-route-information', there are parameters such as 'table' and 'destination'. Any valid RPC argument can be passed to :kvargs: to further filter the results of the :get(): operation. neato! NOTES: If you need to create a 'stub' for unit-testing purposes, you want to create a subclass of your table and overload this methods. """ self._clearkeys() if self._path is not None: # for loading from local file-path self.xml = remove_namespaces(etree.parse(self._path).getroot()) return self if self._lxml is not None: return self argkey = vargs[0] if len(vargs) else None rpc_args = {} if self._use_filter: try: filter_xml = generate_sax_parser_input(self) rpc_args["filter_xml"] = filter_xml except Exception as ex: logger.debug("Not able to create SAX parser input due to " "'%s'" % ex) self.D.transform = lambda: remove_namespaces_and_spaces rpc_args.update(self.GET_ARGS) # copy default args # saltstack get_table pass args as named keyword if "args" in kvargs and isinstance(kvargs["args"], dict): rpc_args.update(kvargs.pop("args")) rpc_args.update(kvargs) # copy caller provided args if hasattr(self, "GET_KEY") and argkey is not None: rpc_args.update({self.GET_KEY: argkey}) # execute the Junos RPC to retrieve the table self.xml = getattr(self.RPC, self.GET_RPC)(**rpc_args) # returning self for call-chaining purposes, yo! return self
[docs]def generate_sax_parser_input(obj): """ Used to generate xml object from Table/view to be used in SAX parsing Args: obj: self object which contains table/view details Returns: lxml etree object to be used as sax parser input """ item_tags = [] if "/" in obj.ITEM_XPATH: item_tags = obj.ITEM_XPATH.split("/") parser_ingest = E(item_tags.pop(-1), E(obj.ITEM_NAME_XPATH)) else: parser_ingest = E(obj.ITEM_XPATH, E(obj.ITEM_NAME_XPATH)) local_field_dict = deepcopy(obj.VIEW.FIELDS) # first make element out of group fields if obj.VIEW.GROUPS: for group, group_xpath in obj.VIEW.GROUPS.items(): # need to pop out group items so that it wont be reused with fields group_field_dict = { k: local_field_dict.pop(k) for k, v in obj.VIEW.FIELDS.items() if v.get("group") == group } group_ele = E(group_xpath) for key, val in group_field_dict.items(): group_ele.append(E(val.get("xpath"))) parser_ingest.append(group_ele) for i, item in enumerate(local_field_dict.items()): # i is the index and item will be taple of field key and value field_dict = item[1] if "table" in field_dict: # handle nested table/view child_table = field_dict.get("table") parser_ingest.insert(i + 1, generate_sax_parser_input(child_table)) else: xpath = field_dict.get("xpath") # xpath can be multi level, for ex traffic-statistics/input-pps # going in reverse order, for fields example. # split xpath in 2 part, search for first part xpath, if exists, append later # else continue, and finally add full xpath to parent # min-delay: probe-test-global-results/probe-test-generic-results/probe-test-rtt/probe-summary-results/min-delay # max-delay: probe-test-global-results/probe-test-generic-results/probe-test-rtt/probe-summary-results/max-delay # avg-delay: probe-test-global-results/probe-test-generic-results/probe-test-rtt/probe-summary-results/avg-delay # positive-rtt-jitter: probe-test-global-results/probe-test-generic-results/probe-test-positive-round-trip-jitter/probe-summary-results/avg-delay # loss-percentage: probe-test-global-results/probe-test-generic-results/loss-percentage # current-min-delay: probe-last-test-results/probe-test-generic-results/probe-test-rtt/probe-summary-results/min-delay # current-max-delay: probe-last-test-results/probe-test-generic-results/probe-test-rtt/probe-summary-results/max-delay # current-avg-delay: probe-last-test-results/probe-test-generic-results/probe-test-rtt/probe-summary-results/avg-delay # current-positive-rtt-jitter: probe-last-test-results/probe-test-generic-results/probe-test-positive-round-trip-jitter/probe-summary-results/avg-delay # current-loss-percentage: probe-last-test-results/probe-test-generic-results/loss-percentage if "/" in xpath: tags = xpath.split("/") tags_len = len(tags) local_elem_to_add = E(tags[-1]) for i in range(tags_len, 0, -1): xpath = "/".join(tags[:i]) local_obj = parser_ingest elem_exists = local_obj.xpath(xpath) if elem_exists: xpath_exists = elem_exists[0] xpath_exists.insert(1, local_elem_to_add) break if local_elem_to_add.tag != tags[i - 1]: local_elem_to_add = E(tags[i - 1], local_elem_to_add) else: parser_ingest.insert(1, local_elem_to_add) else: parser_ingest.insert(i + 1, E(xpath)) # cases where item is something like # item: task-memory-malloc-usage-report/task-malloc-list/task-malloc # created filter from last item task-malloc # Now add all the tags if present for item_tag in item_tags[::-1]: parser_ingest = E(item_tag, parser_ingest) logger.debug("Generated filter XML is: %s" % etree.tostring(parser_ingest)) return parser_ingest