#
# Copyright (c) 2016-2017 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Adjacent Link LLC nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
"""Contains labtools observers for performing various math functions on
variable instances.
"""
from subject import Subject
from observer import Observer
import numpy as np
[docs]class Sum(Subject,Observer):
"""
Produces summation value of subject data.
"""
def __init__(self,*subjects,**kwargs):
"""Creates a Sum instance.
Sum instances work on subjects with single value state or list
of values state:
* val
* [val_0,val_1,...val_N]
Args:
subjects ([subject]): One or more variables to sum.
Keywords Args:
name (str): Modified variable name. Default:
Sum(variable[+variable]...).
Raises:
KeyError
Example::
model = Model(Sum(Delta(varSchedOK1),
Delta(varSchedOK2)),
Sum(Delta(varSchedErr1),
Delta(varSchedErr2),
Delta(varSchedErr3),
Delta(varSchedErr4)),
...)
"""
# validate subjects
if not subjects:
raise ValueError('sum must have one or more subjects')
stream = subjects[0].stream()
for subject in subjects[1:]:
if stream != subject.stream():
raise ValueError('sum subjects must be from the same stream')
Subject.__init__(self,subjects[0].stream())
Observer.__init__(self)
self._subjects = subjects
self._cache = {}
self._sums = {}
self._store = {}
self._index = {y:x for x,y in enumerate(subjects)}
self._tags = set()
self._updated = set()
self._name = kwargs.pop('name',None)
if self._name == None:
self._name = "Sum(%s)" % "+".join([x.name() for x in self._subjects])
if kwargs:
raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
for subject in self._subjects:
subject.attach(self)
[docs] def name(self):
"""Gets the name of this subject.
Returns:
Subject name (str).
"""
return self._name
[docs] def notify(self,subject):
"""
Processes subject update to produce new state.
"""
data,updated = subject.state()
if not data and not updated:
self._cache.clear()
self._sums.clear()
self._store.clear()
self._tags.clear()
self._updated.clear()
self.notify_observers()
return
# remove values in cache and previous data no longer available
# in subject
for purge in set(self._cache.keys()).difference(data.keys()):
del self._cache[purge]
del self._sums[purge]
# update tags and existing cache with nan for any new tag entries
for tags in data.values():
for tag in tags:
if tag not in self._tags:
# update existing cache/previous with np.nan subject values
for timestamp in self._cache:
self._cache[timestamp][tag] = [np.nan] * len(self._index)
# update tags
self._tags.update(set(tags.keys()))
# update cache to include np.nan data for new timestamps
for timestamp in updated:
if timestamp not in self._cache:
self._cache[timestamp] = {}
for tag in self._tags:
if tag not in self._cache[timestamp]:
self._cache[timestamp][tag] = [np.nan] * len(self._index)
for timestamp in updated:
for tag in data[timestamp]:
self._cache[timestamp][tag][self._index[subject]] = data[timestamp][tag]
self._sums[timestamp] = 0
for entries in self._cache[timestamp].values():
if hasattr(entries,'__iter__'):
if len(entries):
if hasattr(entries[0],'__iter__'):
self._sums[timestamp] += sum([sum(entry.values()) for entry in entries])
else:
self._sums[timestamp] += sum(entries)
else:
self._sums[timestamp] = np.nan
else:
self._sums[timestamp] = np.nan
self._updated = updated
self.notify_observers()
self._updated = set()
[docs] def state(self):
"""Gets the state.
Returns:
A dict of with timestamp (int) keys and dict values. Where
each dict value is the summation value.
"""
return self._sums,self._updated
[docs]class Unique(Subject,Observer):
"""Produces count of unique values subject data.
"""
def __init__(self,subject,**kwargs):
"""Creates a Unique instance.
Unique instances work on subjects with single value state:
* val
Args:
subject (Subject): Variable to modify.
Keyword Args:
name (str): Modified variable name. Default: Unique(variable).
Example::
model = ModelTimeSeries(Unique(varSomeId)),
...)
"""
Subject.__init__(self,subject.stream())
Observer.__init__(self)
self._subject = subject
self._tags = set()
self._update = set()
self._cache = {}
self._uniques = {}
self._name = kwargs.pop('name',None)
if self._name == None:
self._name = "Unique(%s)" % subject.name()
if kwargs:
raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
subject.attach(self)
[docs] def name(self):
"""Gets the name of this subject.
Returns:
Subject name (str).
"""
return self._name
[docs] def notify(self,subject):
"""
Processes subject update to produce new state.
"""
data,updated = subject.state()
if not data and not updated:
self._tags.clear()
self._update.clear()
self._cache.clear()
self._uniques.clear()
self.notify_observers()
return
# remove values in cache and previous data no longer available
# in subject
for purge in set(self._cache.keys()).difference(data.keys()):
del self._cache[purge]
del self._uniques[purge]
# update tags and existing cache with nan for any new tag entries
for tags in data.values():
for tag in tags:
if tag not in self._tags:
# update existing cache/previous with np.nan subject values
for timestamp in self._cache:
self._cache[timestamp][tag] = np.nan
# update tags
self._tags.update(set(tags.keys()))
# update cache to include np.nan data for new timestamps
for timestamp in updated:
if timestamp not in self._cache:
self._cache[timestamp] = {}
for tag in self._tags:
if tag not in self._cache[timestamp]:
self._cache[timestamp][tag] = np.nan
for timestamp in updated:
for tag in data[timestamp]:
self._cache[timestamp][tag] = data[timestamp][tag]
for timestamp in updated:
unique = set(self._cache[timestamp].values())
if np.nan in unique:
self._uniques[timestamp] = np.nan
else:
self._uniques[timestamp] = len(unique)
self._updated = updated
self.notify_observers()
self._updated = set()
[docs] def state(self):
"""Gets the state.
Returns:
A dict of with timestamp (int) keys and dict values. Where
each dict value is the unique count.
"""
return self._uniques,self._updated