Source code for otestpoint.labtools.stream

#
# Copyright (c) 2016-2017 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
#  * Neither the name of Adjacent Link LLC nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#

"""Stream OpenTestPoint Probe Reports to labtools variables.

A Stream is a factory for creating labtools variables that receive
measurement data from one or more subscribed OpenTestPoint probes
published by an endpoint.

"""

import otestpoint.interface.probereport_pb2
from otestpoint.interface import make_measurement_operator
from variable import Variable
from model import Model
import threading
import zmq
import traceback

[docs]class Stream: """Factory for streaming measurements to variables.""" def __init__(self,endpoint,*probes,**kwargs): """Creates a Stream instance. Args: endpoint (str): OpenTestPoint probe publisher in the form host:port. probes (str): One or more OpenTestPoint probe names. Keyword Args: cache (int): Number of measurements to cache per reporting tag. Default: 120. notify_threshold (int): Number of milliseconds to delay variable notification after receiving a measurement. Default: 500. rate (int): Rate in seconds the OpenTestPoint probes are set to publish. Default: 5. Raises: KeyError: If an invalid keyword is found. Example:: s = Stream('localhost:9002', 'EMANE.PhysicalLayer.Counters.General', 'EMANE.TDMAEventSchedulerRadioModel.Counters.Schedule', 'EMANE.TDMAEventSchedulerRadioModel.Counters.General', 'EMANE.TDMAEventSchedulerRadioModel.Tables.Status.Slot', 'EMANE.VirtualTransport.Tables.Status') """ self._endpoint = endpoint self._probes = probes self._operators = {} self._variables = {} self._notify_threshold = kwargs.pop('notify_threshold',500) self._rate = kwargs.pop('rate',5) self._cache = kwargs.pop('cache',120) self._execpted = None self._cancel_event = threading.Event() if kwargs: raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
[docs] def variable(self,measurement,attribute,**kwargs): """Creates a Variable instance that will receive attribute measurement updates. You must be sure to instantiate the Stream with a probe that provides the specified measurement in order to receive updates. Args: measurement (str): Name of the measurement to associate with the variable. attribute (str): Name of the attribute contained in the measurement to associate with the variable. Keyword Args: apply (callable): Callable applied to a measurement prior to storing. Useful to reduce storage to just the relevant measurement information of interest. Default: lambda x: x. cache (int): Number of measurements to cache per reporting tag. Default: Value set for stream. tags ([str]): Names of tags to include measurements from. If empty all tags are included. Default: []. Returns: :class:`~otestpoint.labtools.variable.Variable`: New variable instance. Raises: KeyError: If an invalid keyword is found. Example:: v = stream.variable('Measurement_emane_physicallayer_counters_general', 'processedevents') """ if 'cache' not in kwargs: kwargs['cache'] = self._cache v = Variable(self,measurement,attribute,**kwargs) if measurement not in self._variables: self._variables[measurement] = [] self._variables[measurement].append(v) return v
[docs] def model(self,*subjects,**kwargs): """Creates a data model. Args: subjects (object): Tuple of variables (with or without modifiers) used to populate the model. Keyword Args: by_tag (bool): Indicate if the model should using tags as the index or timestamps. Default: False. labels ([str]): Names of the model columns. If empty, column default names are made using measurement and attribute names. Default: []. Returns: :class:`~otestpoint.labtools.model.Model`: New model instance. Raises: KeyError: If invalid keyword found. Example:: model = stream.model(Delta(Transform(varRxSlotStatusTable, TableColumns(3))), Delta(Transform(varRxSlotStatusTable, TableColumns(4,5,6,7,8))), Delta(Transform(varTxSlotStatusTable, TableColumns(3))), Delta(Transform(varTxSlotStatusTable, TableColumns(4,5))), labels=['Rx Slot Success', 'Rx Slot Error', 'Tx Slot Success', 'Tx Slot Error'], by_tag=True) """ return Model(self,*subjects,**kwargs)
[docs] def cache(self): """Gets the stream cache size Returns: int: Cache size. """ return self._cache
[docs] def run(self): """Starts the processing for OpenTestPoint probe measurements.""" thread = threading.Thread(target=self._run) thread.setDaemon(True) thread.start()
[docs] def stop(self): """Stops the processing for OpenTestPoint probe measurements.""" self._cancel_event.set()
def _run(self): context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.setsockopt(zmq.IPV4ONLY,0) subscriber.connect("tcp://" + self._endpoint) for probe in self._probes: subscriber.setsockopt(zmq.SUBSCRIBE,probe) poller = zmq.Poller() poller.register(subscriber,zmq.POLLIN) timeout = None variables_to_notify = set() try: while not self._cancel_event.is_set(): data="" socks = dict(poller.poll(timeout)) if socks.get(subscriber) == zmq.POLLIN: msgs = subscriber.recv_multipart() report = otestpoint.interface.probereport_pb2.ProbeReport() report.ParseFromString(msgs[1]) if self._execpted == None: self._execpted = report.timestamp if report.timestamp != self._execpted: if report.timestamp == self._execpted + self._rate: self._execpted = report.timestamp # new timetamp notify any pending variables if variables_to_notify: for variable in variables_to_notify: variable.notify() variables_to_notify = set() else: self._execpted = report.timestamp for variables in self._variables.values(): for variable in variables: variable.reset() if report.data.name not in self._operators: MeasurementOperator = make_measurement_operator(report.data.module, report.data.name) if MeasurementOperator != None: operator = MeasurementOperator() else: operator = None self._operators[report.data.name] = operator else: operator =self._operators[report.data.name] if operator != None: measurement = operator.create(report.data.blob) if report.data.name in self._variables: for variable in self._variables[report.data.name]: if variable.update(report.timestamp,report.tag,measurement): variables_to_notify.add(variable) timeout = self._notify_threshold else: if variables_to_notify: for variable in variables_to_notify: variable.notify() variables_to_notify = set() timeout = None except KeyboardInterrupt: print traceback.format_exc()