Source code for otestpoint.labtools.math

#
# Copyright (c) 2016-2017 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
#  * Neither the name of Adjacent Link LLC nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
"""Contains labtools observers for performing various math functions on
variable instances.

"""
from subject import Subject
from observer import Observer
import numpy as np

[docs]class Sum(Subject,Observer): """ Produces summation value of subject data. """ def __init__(self,*subjects,**kwargs): """Creates a Sum instance. Sum instances work on subjects with single value state or list of values state: * val * [val_0,val_1,...val_N] Args: subjects ([subject]): One or more variables to sum. Keywords Args: name (str): Modified variable name. Default: Sum(variable[+variable]...). Raises: KeyError Example:: model = Model(Sum(Delta(varSchedOK1), Delta(varSchedOK2)), Sum(Delta(varSchedErr1), Delta(varSchedErr2), Delta(varSchedErr3), Delta(varSchedErr4)), ...) """ # validate subjects if not subjects: raise ValueError('sum must have one or more subjects') stream = subjects[0].stream() for subject in subjects[1:]: if stream != subject.stream(): raise ValueError('sum subjects must be from the same stream') Subject.__init__(self,subjects[0].stream()) Observer.__init__(self) self._subjects = subjects self._cache = {} self._sums = {} self._store = {} self._index = {y:x for x,y in enumerate(subjects)} self._tags = set() self._updated = set() self._name = kwargs.pop('name',None) if self._name == None: self._name = "Sum(%s)" % "+".join([x.name() for x in self._subjects]) if kwargs: raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys())) for subject in self._subjects: subject.attach(self)
[docs] def name(self): """Gets the name of this subject. Returns: Subject name (str). """ return self._name
[docs] def notify(self,subject): """ Processes subject update to produce new state. """ data,updated = subject.state() if not data and not updated: self._cache.clear() self._sums.clear() self._store.clear() self._tags.clear() self._updated.clear() self.notify_observers() return # remove values in cache and previous data no longer available # in subject for purge in set(self._cache.keys()).difference(data.keys()): del self._cache[purge] del self._sums[purge] # update tags and existing cache with nan for any new tag entries for tags in data.values(): for tag in tags: if tag not in self._tags: # update existing cache/previous with np.nan subject values for timestamp in self._cache: self._cache[timestamp][tag] = [np.nan] * len(self._index) # update tags self._tags.update(set(tags.keys())) # update cache to include np.nan data for new timestamps for timestamp in updated: if timestamp not in self._cache: self._cache[timestamp] = {} for tag in self._tags: if tag not in self._cache[timestamp]: self._cache[timestamp][tag] = [np.nan] * len(self._index) for timestamp in updated: for tag in data[timestamp]: self._cache[timestamp][tag][self._index[subject]] = data[timestamp][tag] self._sums[timestamp] = 0 for entries in self._cache[timestamp].values(): if hasattr(entries,'__iter__'): if len(entries): if hasattr(entries[0],'__iter__'): self._sums[timestamp] += sum([sum(entry.values()) for entry in entries]) else: self._sums[timestamp] += sum(entries) else: self._sums[timestamp] = np.nan else: self._sums[timestamp] = np.nan self._updated = updated self.notify_observers() self._updated = set()
[docs] def state(self): """Gets the state. Returns: A dict of with timestamp (int) keys and dict values. Where each dict value is the summation value. """ return self._sums,self._updated
[docs]class Unique(Subject,Observer): """Produces count of unique values subject data. """ def __init__(self,subject,**kwargs): """Creates a Unique instance. Unique instances work on subjects with single value state: * val Args: subject (Subject): Variable to modify. Keyword Args: name (str): Modified variable name. Default: Unique(variable). Example:: model = ModelTimeSeries(Unique(varSomeId)), ...) """ Subject.__init__(self,subject.stream()) Observer.__init__(self) self._subject = subject self._tags = set() self._update = set() self._cache = {} self._uniques = {} self._name = kwargs.pop('name',None) if self._name == None: self._name = "Unique(%s)" % subject.name() if kwargs: raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys())) subject.attach(self)
[docs] def name(self): """Gets the name of this subject. Returns: Subject name (str). """ return self._name
[docs] def notify(self,subject): """ Processes subject update to produce new state. """ data,updated = subject.state() if not data and not updated: self._tags.clear() self._update.clear() self._cache.clear() self._uniques.clear() self.notify_observers() return # remove values in cache and previous data no longer available # in subject for purge in set(self._cache.keys()).difference(data.keys()): del self._cache[purge] del self._uniques[purge] # update tags and existing cache with nan for any new tag entries for tags in data.values(): for tag in tags: if tag not in self._tags: # update existing cache/previous with np.nan subject values for timestamp in self._cache: self._cache[timestamp][tag] = np.nan # update tags self._tags.update(set(tags.keys())) # update cache to include np.nan data for new timestamps for timestamp in updated: if timestamp not in self._cache: self._cache[timestamp] = {} for tag in self._tags: if tag not in self._cache[timestamp]: self._cache[timestamp][tag] = np.nan for timestamp in updated: for tag in data[timestamp]: self._cache[timestamp][tag] = data[timestamp][tag] for timestamp in updated: unique = set(self._cache[timestamp].values()) if np.nan in unique: self._uniques[timestamp] = np.nan else: self._uniques[timestamp] = len(unique) self._updated = updated self.notify_observers() self._updated = set()
[docs] def state(self): """Gets the state. Returns: A dict of with timestamp (int) keys and dict values. Where each dict value is the unique count. """ return self._uniques,self._updated