#
# Copyright (c) 2014-2018 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Adjacent Link LLC nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import otestpoint.interface.probereport_pb2
from otestpoint.interface import make_measurement_operator
from variable import Variable
from model import Model
import threading
import traceback
import sqlite3
import re
[docs]class DataFrameBuilder:
"""Builds Pandas DataFrames from variables defined for recorded
measurment attributes."""
def __init__(self,files,*probes,**kwargs):
"""Creates a DataFrameBuilder instance.
Args:
files (list(str)): List of otestpoint-recorder output data
files. The corresponding SQLite otestpoint-recorder db
files must match data file name + '.db'.
probes (str): One or more OpenTestPoint probe names.
Raises:
KeyError: If an invalid keyword is found.
Example::
df_builder = DataFrameBuilder(['node-01-otestpoint-recorder.data',
'node-02-otestpoint-recorder.data',
'node-03-otestpoint-recorder.data'],
'EMANE.PhysicalLayer.Counters.General',
'EMANE.TDMAEventSchedulerRadioModel.Counters.Schedule',
'EMANE.TDMAEventSchedulerRadioModel.Counters.General',
'EMANE.TDMAEventSchedulerRadioModel.Tables.Status.Slot',
'EMANE.VirtualTransport.Tables.Status')
"""
self._files = files
self._probes = probes
self._operators = {}
self._variables = {}
self._execpted = None
self._cancel_event = threading.Event()
self._entries = []
self._timestamps = []
self._model = None
if kwargs:
raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
class Entry:
pass
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(item) != None
sql="SELECT time,offset FROM probes WHERE"
i = len(self._probes)
while(i):
sql += " probe REGEXP ?"
if i > 1:
sql += " OR"
i -= 1
sql += " ORDER BY time ASC;"
timestamps = set()
for filename in self._files:
entry = Entry()
entry.name = filename
entry.fd = open(filename,'r')
entry.conn = sqlite3.connect(filename+'.db')
entry.conn.create_function("REGEXP", 2, regexp)
for row in entry.conn.execute(sql,[x.replace('.','\.') for x in self._probes]):
timestamps.add(int(row[0]))
entry.cursor = entry.conn.cursor()
self._entries.append(entry)
self._timestamps = sorted(timestamps)
[docs] def variable(self,measurement,attribute,**kwargs):
"""Creates a Variable instance that will receive attribute measurement
updates.
You must be sure to instantiate the DataFrameBuilder with a
probe that provides the specified measurement in order to
receive updates.
Args:
measurement (str): Name of the measurement to associate
with the variable.
attribute (str): Name of the attribute contained in the
measurement to associate with the variable.
Keyword Args:
apply (callable): Callable applied to a measurement prior
to storing. Useful to reduce storage to just the
relevant measurement information of interest. Default:
lambda x: x.
tags ([str]): Names of tags to include measurements
from. If empty all tags are included. Default: [].
Returns:
:class:`~otestpoint.labtools.variable.Variable`: New
variable instance.
Raises:
KeyError: If an invalid keyword is found.
Example::
v = df_builder.variable('Measurement_emane_physicallayer_counters_general',
'processedevents')
"""
kwargs['cache'] = 1
v = Variable(self,measurement,attribute,**kwargs)
if measurement not in self._variables:
self._variables[measurement] = []
self._variables[measurement].append(v)
return v
[docs] def model(self,*subjects,**kwargs):
"""Creates the data model using transforms and variables.
Args:
subjects (object): Tuple of variables (with or without
modifiers) used to populate the model.
Keyword Args:
by_tag (bool): Indicate if the model should using tags as
the index or timestamps. Default: False.
labels ([str]): Names of the model columns. If empty,
column default names are made using measurement and
attribute names. Default: [].
Raises:
KeyError: If invalid keyword found.
Example::
model = df_builder.model(Delta(Transform(varRxSlotStatusTable,
TableColumns(3))),
Delta(Transform(varRxSlotStatusTable,
TableColumns(4,5,6,7,8))),
Delta(Transform(varTxSlotStatusTable,
TableColumns(3))),
Delta(Transform(varTxSlotStatusTable,
TableColumns(4,5))),
labels=['Rx Slot Success',
'Rx Slot Error',
'Tx Slot Success',
'Tx Slot Error'],
by_tag=True)
"""
if self._model == None:
kwargs['purge'] = False
self._model = Model(self,*subjects,**kwargs)
[docs] def load(self,**kwargs):
"""Loads the data model and builds one or more dataframes from
recorded measurments.
Keyword Args:
index (int): Numerical index of column to use as the
dataframe index or None for timestamps. Default None.
join (str): Type of join to use for non-timeseries
dataframes. One of: 'left','right','outer' or 'inner.'
Default: 'outer.'
apply (str): Callable applied to dataframe prior to
creating or None. Default: None.
ts (bool): Indicates if dataframe is a
timeseries. Default: True.
Returns:
list: ``tuple(dataframe, timestamp min, timestamp max)``
Where the `timestamp min` (int) and `timestamp max`
(int) values indicate the current measurement window
in seconds since the epoch.
Raises:
KeyError: If invalid keyword found.
ImportError: If unable to import a probe measurement.
Example::
dfs = df_builder.load(apply=cbwu)
if dfs:
df,timestamp_start,timestamp_end = dfs[0]
df.to_csv('dataframe.csv');
"""
# reset the model
class EmptySubject:
def state(self):
return ({},[])
empty_subject = EmptySubject()
self._model.notify(empty_subject)
dataframes = []
kw_ts = kwargs.get('ts',True)
if kw_ts:
if 'timestamps' in kwargs:
raise KeyError("Keyword 'timetsamps' only valid for non-timeseries dataframes (when ts=False).")
if 'select' in kwargs:
raise KeyError("Keyword 'select' only valid for non-timeseries dataframes (when ts=False).")
else:
kw_timestamps = kwargs.pop('timestamps',set())
kw_select = kwargs.pop('select',lambda x,y,z: True)
sql="SELECT offset,size,probe FROM probes WHERE time == ? AND ("
i = len(self._probes)
while(i):
sql += " probe REGEXP ?"
if i > 1:
sql += " OR"
i -= 1
sql += " );"
variables_to_notify = set()
timestamp_next = None
for timestamp in self._timestamps:
if timestamp_next != None and timestamp != timestamp_next:
for variables in self._variables.values():
for variable in variables:
variable.reset()
timestamp_next = timestamp + 5
for entry in self._entries:
entry.cursor.execute(sql,
[timestamp] +
[x.replace('.','\.') for x in self._probes])
while True:
row = entry.cursor.fetchone()
if row != None:
offset,size,probe = row
entry.fd.seek(offset)
msg= entry.fd.read(size)
report = otestpoint.interface.probereport_pb2.ProbeReport()
report.ParseFromString(msg)
if report.data.name not in self._operators:
MeasurementOperator = make_measurement_operator(report.data.module,
report.data.name)
if MeasurementOperator != None:
operator = MeasurementOperator()
else:
raise ImportError("Module %s with %s not found" %
(report.data.module, report.data.name))
self._operators[report.data.name] = operator
else:
operator = self._operators[report.data.name]
if operator != None:
measurement = operator.create(report.data.blob)
if report.data.name in self._variables:
for variable in self._variables[report.data.name]:
if variable.update(report.timestamp,report.tag,measurement):
variables_to_notify.add(variable)
else:
break
# done with this timestamp
if variables_to_notify:
for variable in variables_to_notify:
variable.notify()
variables_to_notify = set()
if not kw_ts and \
(len(kw_timestamps) == 0 or
timestamp in kw_timestamps):
df,event,(timestamp_start,timestamp_end) = self._model.data(**kwargs)
if kw_select(timestamp_start,timestamp_end,df) == True:
dataframes.append((timestamp_start,timestamp_end,df))
if kw_ts:
df,event,(timestamp_start,timestamp_end) = self._model.data(**kwargs)
dataframes.append((timestamp_start,timestamp_end,df))
return dataframes