#
# Copyright (c) 2016-2017 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Adjacent Link LLC nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
"""Stream OpenTestPoint Probe Reports to labtools variables.
A Stream is a factory for creating labtools variables that receive
measurement data from one or more subscribed OpenTestPoint probes
published by an endpoint.
"""
import otestpoint.interface.probereport_pb2
from otestpoint.interface import make_measurement_operator
from variable import Variable
from model import Model
import threading
import zmq
import traceback
[docs]class Stream:
"""Factory for streaming measurements to variables."""
def __init__(self,endpoint,*probes,**kwargs):
"""Creates a Stream instance.
Args:
endpoint (str): OpenTestPoint probe publisher in the form
host:port.
probes (str): One or more OpenTestPoint probe names.
Keyword Args:
cache (int): Number of measurements to cache per reporting
tag. Default: 120.
notify_threshold (int): Number of milliseconds to delay
variable notification after receiving a
measurement. Default: 500.
rate (int): Rate in seconds the OpenTestPoint probes are
set to publish. Default: 5.
Raises:
KeyError: If an invalid keyword is found.
Example::
s = Stream('localhost:9002',
'EMANE.PhysicalLayer.Counters.General',
'EMANE.TDMAEventSchedulerRadioModel.Counters.Schedule',
'EMANE.TDMAEventSchedulerRadioModel.Counters.General',
'EMANE.TDMAEventSchedulerRadioModel.Tables.Status.Slot',
'EMANE.VirtualTransport.Tables.Status')
"""
self._endpoint = endpoint
self._probes = probes
self._operators = {}
self._variables = {}
self._notify_threshold = kwargs.pop('notify_threshold',500)
self._rate = kwargs.pop('rate',5)
self._cache = kwargs.pop('cache',120)
self._execpted = None
self._cancel_event = threading.Event()
if kwargs:
raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
[docs] def variable(self,measurement,attribute,**kwargs):
"""Creates a Variable instance that will receive attribute measurement
updates.
You must be sure to instantiate the Stream with a probe that
provides the specified measurement in order to receive
updates.
Args:
measurement (str): Name of the measurement to associate
with the variable.
attribute (str): Name of the attribute contained in the
measurement to associate with the variable.
Keyword Args:
apply (callable): Callable applied to a measurement prior
to storing. Useful to reduce storage to just the
relevant measurement information of interest. Default:
lambda x: x.
cache (int): Number of measurements to cache per reporting
tag. Default: Value set for stream.
tags ([str]): Names of tags to include measurements
from. If empty all tags are included. Default: [].
Returns:
:class:`~otestpoint.labtools.variable.Variable`: New
variable instance.
Raises:
KeyError: If an invalid keyword is found.
Example::
v = stream.variable('Measurement_emane_physicallayer_counters_general',
'processedevents')
"""
if 'cache' not in kwargs:
kwargs['cache'] = self._cache
v = Variable(self,measurement,attribute,**kwargs)
if measurement not in self._variables:
self._variables[measurement] = []
self._variables[measurement].append(v)
return v
[docs] def model(self,*subjects,**kwargs):
"""Creates a data model.
Args:
subjects (object): Tuple of variables (with or without
modifiers) used to populate the model.
Keyword Args:
by_tag (bool): Indicate if the model should using tags as
the index or timestamps. Default: False.
labels ([str]): Names of the model columns. If empty,
column default names are made using measurement and
attribute names. Default: [].
Returns:
:class:`~otestpoint.labtools.model.Model`: New model
instance.
Raises:
KeyError: If invalid keyword found.
Example::
model = stream.model(Delta(Transform(varRxSlotStatusTable,
TableColumns(3))),
Delta(Transform(varRxSlotStatusTable,
TableColumns(4,5,6,7,8))),
Delta(Transform(varTxSlotStatusTable,
TableColumns(3))),
Delta(Transform(varTxSlotStatusTable,
TableColumns(4,5))),
labels=['Rx Slot Success',
'Rx Slot Error',
'Tx Slot Success',
'Tx Slot Error'],
by_tag=True)
"""
return Model(self,*subjects,**kwargs)
[docs] def cache(self):
"""Gets the stream cache size
Returns:
int: Cache size.
"""
return self._cache
[docs] def run(self):
"""Starts the processing for OpenTestPoint probe measurements."""
thread = threading.Thread(target=self._run)
thread.setDaemon(True)
thread.start()
[docs] def stop(self):
"""Stops the processing for OpenTestPoint probe measurements."""
self._cancel_event.set()
def _run(self):
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.IPV4ONLY,0)
subscriber.connect("tcp://" + self._endpoint)
for probe in self._probes:
subscriber.setsockopt(zmq.SUBSCRIBE,probe)
poller = zmq.Poller()
poller.register(subscriber,zmq.POLLIN)
timeout = None
variables_to_notify = set()
try:
while not self._cancel_event.is_set():
data=""
socks = dict(poller.poll(timeout))
if socks.get(subscriber) == zmq.POLLIN:
msgs = subscriber.recv_multipart()
report = otestpoint.interface.probereport_pb2.ProbeReport()
report.ParseFromString(msgs[1])
if self._execpted == None:
self._execpted = report.timestamp
if report.timestamp != self._execpted:
if report.timestamp == self._execpted + self._rate:
self._execpted = report.timestamp
# new timetamp notify any pending variables
if variables_to_notify:
for variable in variables_to_notify:
variable.notify()
variables_to_notify = set()
else:
self._execpted = report.timestamp
for variables in self._variables.values():
for variable in variables:
variable.reset()
if report.data.name not in self._operators:
MeasurementOperator = make_measurement_operator(report.data.module,
report.data.name)
if MeasurementOperator != None:
operator = MeasurementOperator()
else:
operator = None
self._operators[report.data.name] = operator
else:
operator =self._operators[report.data.name]
if operator != None:
measurement = operator.create(report.data.blob)
if report.data.name in self._variables:
for variable in self._variables[report.data.name]:
if variable.update(report.timestamp,report.tag,measurement):
variables_to_notify.add(variable)
timeout = self._notify_threshold
else:
if variables_to_notify:
for variable in variables_to_notify:
variable.notify()
variables_to_notify = set()
timeout = None
except KeyboardInterrupt:
print traceback.format_exc()