Source code for otestpoint.labtools.delta

#
# Copyright (c) 2016-2018 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
#  * Neither the name of Adjacent Link LLC nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#

"""Produce delta value for labtools variable instances.

A Delta observer produces delta values of subject data.
"""

from subject import Subject
from observer import Observer
import pandas as pd
import numpy as np

[docs]class Delta(Subject,Observer): """Produce delta values from subject data. Delta modifiers work on subjects with single value state, list of value state and list of list of value state: * val * [val_0,val_1,...val_N] * [[val_0,val_1,...val_N],...,[val_0,val_1,...val_N]] """ def __init__(self,variable,**kwargs): """Creates a Delta modifier instance. Args: variable (Subject): Variable to modify. Keyword Args: name (str): Modified variable name. Default: Delta(variable). Example:: model = Model(Sum(Delta(varSchedOK1), Delta(varSchedOK2)), Sum(Delta(varSchedErr1), Delta(varSchedErr2), Delta(varSchedErr3), Delta(varSchedErr4)), ...) """ Subject.__init__(self,variable.stream()) Observer.__init__(self) self._deltas = {} self._updated = set() self._cache = {} self._tags = set() self._name = kwargs.pop('name',None) if self._name == None: self._name = "Delta(%s)" % variable.name() if kwargs: raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys())) variable.attach(self)
[docs] def name(self): """Gets the name of this observer. Returns: str: Modified variable name. """ return self._name
[docs] def notify(self,subject): """ Processes subject update to produce new state. """ data,updated = subject.state() if not data and not updated: self._deltas.clear() self._updated.clear() self._cache.clear() self._tags.clear() self.notify_observers() return # update tags and existing cache with nan for any new tag entries for tags in data.values(): for tag in tags: if tag not in self._tags: # update existing cache/previous with np.nan subject values for timestamp in self._deltas: self._deltas[timestamp][tag] = np.nan self._cache[timestamp][tag] = None # update tags self._tags.update(set(tags.keys())) # update cache to include np.nan data for new timestamps for timestamp in updated: if timestamp not in self._deltas: self._deltas[timestamp] = {} self._cache[timestamp] = {} for tag in self._tags: if tag not in self._deltas[timestamp]: self._deltas[timestamp][tag] = np.nan self._cache[timestamp][tag] = None sorted_timestamps = sorted(self._cache.keys()) for timestamp in updated: previous_timestamp_index = sorted_timestamps.index(timestamp) - 1 if previous_timestamp_index >= 0: previous_timestamp = sorted_timestamps[previous_timestamp_index] else: # looks odd, but it values will be nan and cause a nan delta previous_timestamp = timestamp for tag in data[timestamp]: if hasattr(data[timestamp][tag], '__iter__'): # case where there is either a list of lists of # single values per tag or a list of single values # per tag values = [] for value_index, value in enumerate(data[timestamp][tag]): if hasattr(value, '__iter__'): # there is a list of lists of single values per tag if self._cache[previous_timestamp][tag] == None: self._cache[previous_timestamp][tag] = [[np.nan] * len(data[timestamp][tag][value_index])] * len(data[timestamp][tag]) subvalues = [] for subvalue_index,subvalue in enumerate(value): if hasattr(self._cache[previous_timestamp][tag],'__iter__') and \ subvalue_index < len(self._cache[previous_timestamp][tag][value_index]): subvalue_previous = self._cache[previous_timestamp][tag][value_index][subvalue_index] if pd.notnull(subvalue_previous): if subvalue >= subvalue_previous: subvalues.append(subvalue - subvalue_previous) else: subvalues.append(np.nan) else: subvalues.append(np.nan) else: subvalues.append(np.nan) values.append(subvalues) else: # there is a list of single values per tag if self._cache[previous_timestamp][tag] == None: self._cache[previous_timestamp][tag] = [np.nan] * len(data[timestamp][tag]) value_previous = self._cache[previous_timestamp][tag][value_index] if pd.notnull(value_previous): if value >= value_previous: values.append(value - value_previous) else: values.append(np.nan) else: values.append(np.nan) # update the previous time data self._cache[timestamp][tag] = data[timestamp][tag] self._deltas[timestamp][tag] = values else: # case where there is a single value per tag previous = self._cache[previous_timestamp][tag] # test if we have a previous value and if so does # it match the current value structure # (i.e. single value or list if values) if previous != None and \ hasattr(previous, '__iter__') == hasattr(data[timestamp][tag], '__iter__') and \ pd.notnull(previous): if data[timestamp][tag] >= previous: value = data[timestamp][tag] - previous else: value = np.nan else: value = np.nan self._cache[timestamp][tag] = data[timestamp][tag] self._deltas[timestamp][tag] = value # remove values in cache and previous data no longer available # in subject - do after loading new cache otherwise you will # drop previous value outside the subject cache that may be # required to calculate all deltas in the current cache window for purge in set(self._deltas.keys()).difference(data.keys()): del self._deltas[purge] del self._cache[purge] self._updated = updated self.notify_observers() self._updated = set()
[docs] def state(self): """Gets the state. Returns: dict: A dict of with timestamp (int) keys and dict values. Where each dict value has tag name (str) keys and the delta value. Depending on the subject, that can be a single value, a list of values or a list of list of values. """ return self._deltas,self._updated