Source code for otestpoint.labtools.dataframebuilder

#
# Copyright (c) 2014-2018 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
#  * Neither the name of Adjacent Link LLC nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#

import otestpoint.interface.probereport_pb2
from otestpoint.interface import make_measurement_operator
from variable import Variable
from model import Model
import threading
import traceback
import sqlite3
import re

[docs]class DataFrameBuilder: """Builds Pandas DataFrames from variables defined for recorded measurment attributes.""" def __init__(self,files,*probes,**kwargs): """Creates a DataFrameBuilder instance. Args: files (list(str)): List of otestpoint-recorder output data files. The corresponding SQLite otestpoint-recorder db files must match data file name + '.db'. probes (str): One or more OpenTestPoint probe names. Raises: KeyError: If an invalid keyword is found. Example:: df_builder = DataFrameBuilder(['node-01-otestpoint-recorder.data', 'node-02-otestpoint-recorder.data', 'node-03-otestpoint-recorder.data'], 'EMANE.PhysicalLayer.Counters.General', 'EMANE.TDMAEventSchedulerRadioModel.Counters.Schedule', 'EMANE.TDMAEventSchedulerRadioModel.Counters.General', 'EMANE.TDMAEventSchedulerRadioModel.Tables.Status.Slot', 'EMANE.VirtualTransport.Tables.Status') """ self._files = files self._probes = probes self._operators = {} self._variables = {} self._execpted = None self._cancel_event = threading.Event() self._entries = [] self._timestamps = [] self._model = None if kwargs: raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys())) class Entry: pass def regexp(expr, item): reg = re.compile(expr) return reg.search(item) != None sql="SELECT time,offset FROM probes WHERE" i = len(self._probes) while(i): sql += " probe REGEXP ?" if i > 1: sql += " OR" i -= 1 sql += " ORDER BY time ASC;" timestamps = set() for filename in self._files: entry = Entry() entry.name = filename entry.fd = open(filename,'r') entry.conn = sqlite3.connect(filename+'.db') entry.conn.create_function("REGEXP", 2, regexp) for row in entry.conn.execute(sql,[x.replace('.','\.') for x in self._probes]): timestamps.add(int(row[0])) entry.cursor = entry.conn.cursor() self._entries.append(entry) self._timestamps = sorted(timestamps)
[docs] def variable(self,measurement,attribute,**kwargs): """Creates a Variable instance that will receive attribute measurement updates. You must be sure to instantiate the DataFrameBuilder with a probe that provides the specified measurement in order to receive updates. Args: measurement (str): Name of the measurement to associate with the variable. attribute (str): Name of the attribute contained in the measurement to associate with the variable. Keyword Args: apply (callable): Callable applied to a measurement prior to storing. Useful to reduce storage to just the relevant measurement information of interest. Default: lambda x: x. tags ([str]): Names of tags to include measurements from. If empty all tags are included. Default: []. Returns: :class:`~otestpoint.labtools.variable.Variable`: New variable instance. Raises: KeyError: If an invalid keyword is found. Example:: v = df_builder.variable('Measurement_emane_physicallayer_counters_general', 'processedevents') """ kwargs['cache'] = 1 v = Variable(self,measurement,attribute,**kwargs) if measurement not in self._variables: self._variables[measurement] = [] self._variables[measurement].append(v) return v
[docs] def model(self,*subjects,**kwargs): """Creates the data model using transforms and variables. Args: subjects (object): Tuple of variables (with or without modifiers) used to populate the model. Keyword Args: by_tag (bool): Indicate if the model should using tags as the index or timestamps. Default: False. labels ([str]): Names of the model columns. If empty, column default names are made using measurement and attribute names. Default: []. Raises: KeyError: If invalid keyword found. Example:: model = df_builder.model(Delta(Transform(varRxSlotStatusTable, TableColumns(3))), Delta(Transform(varRxSlotStatusTable, TableColumns(4,5,6,7,8))), Delta(Transform(varTxSlotStatusTable, TableColumns(3))), Delta(Transform(varTxSlotStatusTable, TableColumns(4,5))), labels=['Rx Slot Success', 'Rx Slot Error', 'Tx Slot Success', 'Tx Slot Error'], by_tag=True) """ if self._model == None: kwargs['purge'] = False self._model = Model(self,*subjects,**kwargs)
[docs] def load(self,**kwargs): """Loads the data model and builds one or more dataframes from recorded measurments. Keyword Args: index (int): Numerical index of column to use as the dataframe index or None for timestamps. Default None. join (str): Type of join to use for non-timeseries dataframes. One of: 'left','right','outer' or 'inner.' Default: 'outer.' apply (str): Callable applied to dataframe prior to creating or None. Default: None. ts (bool): Indicates if dataframe is a timeseries. Default: True. Returns: list: ``tuple(dataframe, timestamp min, timestamp max)`` Where the `timestamp min` (int) and `timestamp max` (int) values indicate the current measurement window in seconds since the epoch. Raises: KeyError: If invalid keyword found. ImportError: If unable to import a probe measurement. Example:: dfs = df_builder.load(apply=cbwu) if dfs: df,timestamp_start,timestamp_end = dfs[0] df.to_csv('dataframe.csv'); """ # reset the model class EmptySubject: def state(self): return ({},[]) empty_subject = EmptySubject() self._model.notify(empty_subject) dataframes = [] kw_ts = kwargs.get('ts',True) if kw_ts: if 'timestamps' in kwargs: raise KeyError("Keyword 'timetsamps' only valid for non-timeseries dataframes (when ts=False).") if 'select' in kwargs: raise KeyError("Keyword 'select' only valid for non-timeseries dataframes (when ts=False).") else: kw_timestamps = kwargs.pop('timestamps',set()) kw_select = kwargs.pop('select',lambda x,y,z: True) sql="SELECT offset,size,probe FROM probes WHERE time == ? AND (" i = len(self._probes) while(i): sql += " probe REGEXP ?" if i > 1: sql += " OR" i -= 1 sql += " );" variables_to_notify = set() timestamp_next = None for timestamp in self._timestamps: if timestamp_next != None and timestamp != timestamp_next: for variables in self._variables.values(): for variable in variables: variable.reset() timestamp_next = timestamp + 5 for entry in self._entries: entry.cursor.execute(sql, [timestamp] + [x.replace('.','\.') for x in self._probes]) while True: row = entry.cursor.fetchone() if row != None: offset,size,probe = row entry.fd.seek(offset) msg= entry.fd.read(size) report = otestpoint.interface.probereport_pb2.ProbeReport() report.ParseFromString(msg) if report.data.name not in self._operators: MeasurementOperator = make_measurement_operator(report.data.module, report.data.name) if MeasurementOperator != None: operator = MeasurementOperator() else: raise ImportError("Module %s with %s not found" % (report.data.module, report.data.name)) self._operators[report.data.name] = operator else: operator = self._operators[report.data.name] if operator != None: measurement = operator.create(report.data.blob) if report.data.name in self._variables: for variable in self._variables[report.data.name]: if variable.update(report.timestamp,report.tag,measurement): variables_to_notify.add(variable) else: break # done with this timestamp if variables_to_notify: for variable in variables_to_notify: variable.notify() variables_to_notify = set() if not kw_ts and \ (len(kw_timestamps) == 0 or timestamp in kw_timestamps): df,event,(timestamp_start,timestamp_end) = self._model.data(**kwargs) if kw_select(timestamp_start,timestamp_end,df) == True: dataframes.append((timestamp_start,timestamp_end,df)) if kw_ts: df,event,(timestamp_start,timestamp_end) = self._model.data(**kwargs) dataframes.append((timestamp_start,timestamp_end,df)) return dataframes