Source code for otestpoint.labtools.model

#
# Copyright (c) 2016-2018 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
#  * Neither the name of Adjacent Link LLC nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#

import sys
import numpy as np
import pandas as pd
import threading
import collections

from observer import Observer

class ModelError(Exception):
    pass

[docs]class Model(Observer): def __init__(self,stream,*subjects,**kwargs): """Creates a data model. Args: stream (obj): Stream associated with the model. subjects (Subject): Tuple of variables (with or without modifiers) used to populate the model. Keyword Args: by_tag (bool): Indicate if the model should using tags as the index or timestamps. Default: False. labels ([str]): Names of the model columns. If empty, column default names are made using measurement and attribute names. Default: []. purge (bool): Indicate if the model should purge values that are no longer cached by subjects. Default: True. Raises: KeyError: If an invalid keyword is found. Example:: model = stream.model(Delta(Transform(varRxSlotStatusTable, TableColumns(3))), Delta(Transform(varRxSlotStatusTable, TableColumns(4,5,6,7,8))), Delta(Transform(varTxSlotStatusTable, TableColumns(3))), Delta(Transform(varTxSlotStatusTable, TableColumns(4,5))), labels=['Rx Slot Success', 'Rx Slot Error', 'Tx Slot Success', 'Tx Slot Error'], by_tag=True) """ Observer.__init__(self) self._stream = stream self._subjects = subjects self._index = {y:x for x,y in enumerate(subjects)} self._cache = {} self._df = pd.DataFrame() self._generate = False self._events = 0 self._lock = threading.Lock() self._tags = set() self._labels = kwargs.pop('labels',None) self._by_tag = kwargs.pop('by_tag',False) self._purge = kwargs.pop('purge',True) if kwargs: raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys())) if self._labels != None and len(self._labels) != len(subjects): print >>sys.stderr,"Invalid labels keyword argument using default labels" self._labels = None for subject in subjects: if subject.stream() != self._stream: raise ModelError('model subject %s from a different stream' % stream.name()) subject.attach(self)
[docs] def stream(self): """Gets the stream associated with the model. Returns: :class:`~otestpoint.labtools.Stream` """ return self._stream
[docs] def by_tag(self): """Indicates if the model is using tags as the index or timestamps Returns: bool: True for indexed by tag, False otherwise. """ return self._by_tag
def notify(self,subject): data,updated = subject.state() self._lock.acquire() if not data and not updated: self._cache.clear() self._df = pd.DataFrame() self._tags.clear() self._events += 1 self._generate = True self._lock.release() return # remove values in cache no longer available in subject if self._purge: for purge in set(self._cache.keys()).difference(data.keys()): del self._cache[purge] # update tags and existing cache with nan for any new tag entries if self._by_tag: for tags in data.values(): for tag in tags: if tag not in self._tags: # update existing cache with np.nan subject values for timestamp in self._cache: self._cache[timestamp][tag] = [np.nan] * len(self._index) # update tags self._tags.update(set(tags.keys())) # update cache to include np.nan data for new timestamps for timestamp in updated: # by_tag model keeps node data seperate if self._by_tag: if timestamp not in self._cache: self._cache[timestamp] = {} for tag in self._tags: if tag not in self._cache[timestamp]: self._cache[timestamp][tag] = [np.nan] * len(self._index) else: if timestamp not in self._cache: self._cache[timestamp] = [np.nan] * len(self._index) # update cache data associated with updated data for timestamp in updated: if self._by_tag: for tag in data[timestamp]: if tag not in self._cache[timestamp]: self._cache[timestamp][tag] = [np.nan] * len(self._index) self._cache[timestamp][tag][self._index[subject]] = data[timestamp][tag] else: self._cache[timestamp][self._index[subject]] = data[timestamp] # increment event indicator to tell observers new measurements # have been received self._events += 1 # set the generate flag so a future call to data() will update # the cached dataframe. self._generate = True self._lock.release()
[docs] def data(self,**kwargs): """Gets a dataframe of current measurements. Keyword Args: index (int): Numerical index of column to use as the dataframe index or None for timestamps. Default None. join (str): Type of join to use for non-timeseries dataframes. One of: 'left','right','outer' or 'inner.' Default: 'outer.' apply (str): Callable applied to dataframe prior to creating or None. Default: None. ts (bool): Indicates if dataframe is a timeseries. Default: True. Returns: tuple: ``(dataframe,event count, (timestamp min, timestamp max))`` Where the `event count` (int) indicates the index (count) of the current model that was used to generate the dataframe. The `event count` increases with every received measurement. This allows for determining if the resulting dataframe is potentially different from the previous dataframe. The `timestamp min` (int) and `timestamp max` (int) values indicate the current measurement window in seconds since the epoch. Raises: KeyError: If an invalid keyword is found. ValueError: If a keyword value is unallowed. ModelError: If a timeseries dataframe is requested from a model not organized by timestamp. Example:: import time import pandas as pd event_prev = 0 pd.set_option('display.max_columns', None) pd.set_option('display.expand_frame_repr', False) while True: df,event_cur,(_,timestamp) = model.data(ts=False, index=None) if event_cur != event_prev: print df event_prev = event_cur time.sleep(1) """ # process keyword arguments kw_index = kwargs.pop('index',None) kw_join = kwargs.pop('join','outer') kw_apply = kwargs.pop('apply',None) if 'time_series' in kwargs: kw_time_series = kwargs.pop('time_series') else: kw_time_series = kwargs.pop('ts',True) # validate keyword arguments if kwargs: raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys())) if kw_join not in ['left','right','outer','inner']: raise ValueError("join must be: 'left','right','outer' or 'inner'.") if kw_index != None and (kw_index > len(self._subjects) or kw_index < 0): raise ValueError("For this model instance index must be: None or [0,%d]." % (len(self._subjects) - 1)) if not kw_time_series and not self._by_tag: raise ModelError("Non-Time Series data must use 'by_tag' model.") events = 0 timestamps = [] if self._cache: timestamp_min = min(self._cache.keys()) timestamp_max = max(self._cache.keys()) else: timestamp_min = 0 timestamp_max = 0 self._lock.acquire() if self._generate: self._df = pd.DataFrame() if kw_time_series: if self._by_tag: measurements = {} for tag in sorted(self._tags): measurements[tag] = {x.name() : [] for x in self._subjects} if self._cache: for timestamp in sorted(self._cache.keys()): timestamps.append(timestamp) for index,subject in enumerate(self._subjects): for tag in sorted(self._tags): if tag in self._cache[timestamp]: measurements[tag][subject.name()].append(self._cache[timestamp][tag][index]) else: measurements[tag][subject.name()].append(np.nan) items = [('Time',timestamps)] for index,subject in enumerate(self._subjects): for tag in sorted(self._tags): label = '%s:%s' % (tag,self._labels[index] if self._labels != None else subject.name()) items.append((label,measurements[tag][subject.name()])) self._df = pd.DataFrame.from_items(items) self._generate = False else: measurements = {x.name() : [] for x in self._subjects} if self._cache: for timestamp in sorted(self._cache.keys()): timestamps.append(timestamp) for index,subject in enumerate(self._subjects): measurements[subject.name()].append(self._cache[timestamp][index]) measurements['Time'] = timestamps items = [('Time',timestamps)] for index,subject in enumerate(self._subjects): label = self._labels[index] if self._labels != None else subject.name() items.append((label,measurements[subject.name()])) self._df = pd.DataFrame.from_items(items) self._generate = False else: if self._cache: # not time series labels = {} index_data = None for index,subject in enumerate(self._subjects): labels[subject.name()] = self._labels[index] if self._labels != None else subject.name() if kw_index != None: dfs = [] # take one column a for tag in sorted(self._cache[timestamp_max]): items = [] for index,columns in enumerate(self._cache[timestamp_max][tag]): if hasattr(columns,'__iter__'): for column_index,column in enumerate(columns): subject_name = self._subjects[index].name() if kw_index == index: items.insert(0,(labels[subject_name],column)) else: label = labels[subject_name] if len(columns) > 1: if len(labels[subject_name]) == len(columns): label = labels[subject_name][column_index] else: label = "%s:%d" % (labels[subject_name],column_index) label = "%s:%s" % (tag,label) items.append((label,column)) if items: df = pd.DataFrame.from_items(items) df.set_index(labels[self._subjects[kw_index].name()], inplace=True) dfs.append(df) if dfs: self._df = dfs[0] if len(dfs) > 1: self._df = self._df.join(dfs[1:],how=kw_join) self._df.reset_index(inplace=True) self._df.rename(columns={'index': labels[self._subjects[kw_index].name()]}, inplace=True) else: items = collections.OrderedDict() items['Node'] = [] for subject in self._subjects: items[labels[subject.name()]] = [] for tag in sorted(self._cache[timestamp_max].keys()): items['Node'].append(tag) for subject in self._subjects: items[labels[subject.name()]].append(self._cache[timestamp_max][tag][self._index[subject]]) if items['Node']: self._df = pd.DataFrame.from_dict(items) df = self._df.copy() events = self._events self._lock.release() if kw_apply != None: return kw_apply(df,events,(timestamp_min,timestamp_max)) else: return df,events,(timestamp_min,timestamp_max)