#
# Copyright (c) 2016-2018 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Adjacent Link LLC nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
"""Produce delta value for labtools variable instances.
A Delta observer produces delta values of subject data.
"""
from subject import Subject
from observer import Observer
import pandas as pd
import numpy as np
[docs]class Delta(Subject,Observer):
"""Produce delta values from subject data.
Delta modifiers work on subjects with single value state, list of
value state and list of list of value state:
* val
* [val_0,val_1,...val_N]
* [[val_0,val_1,...val_N],...,[val_0,val_1,...val_N]]
"""
def __init__(self,variable,**kwargs):
"""Creates a Delta modifier instance.
Args:
variable (Subject): Variable to modify.
Keyword Args:
name (str): Modified variable name. Default: Delta(variable).
Example::
model = Model(Sum(Delta(varSchedOK1),
Delta(varSchedOK2)),
Sum(Delta(varSchedErr1),
Delta(varSchedErr2),
Delta(varSchedErr3),
Delta(varSchedErr4)),
...)
"""
Subject.__init__(self,variable.stream())
Observer.__init__(self)
self._deltas = {}
self._updated = set()
self._cache = {}
self._tags = set()
self._name = kwargs.pop('name',None)
if self._name == None:
self._name = "Delta(%s)" % variable.name()
if kwargs:
raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
variable.attach(self)
[docs] def name(self):
"""Gets the name of this observer.
Returns:
str: Modified variable name.
"""
return self._name
[docs] def notify(self,subject):
"""
Processes subject update to produce new state.
"""
data,updated = subject.state()
if not data and not updated:
self._deltas.clear()
self._updated.clear()
self._cache.clear()
self._tags.clear()
self.notify_observers()
return
# update tags and existing cache with nan for any new tag entries
for tags in data.values():
for tag in tags:
if tag not in self._tags:
# update existing cache/previous with np.nan subject values
for timestamp in self._deltas:
self._deltas[timestamp][tag] = np.nan
self._cache[timestamp][tag] = None
# update tags
self._tags.update(set(tags.keys()))
# update cache to include np.nan data for new timestamps
for timestamp in updated:
if timestamp not in self._deltas:
self._deltas[timestamp] = {}
self._cache[timestamp] = {}
for tag in self._tags:
if tag not in self._deltas[timestamp]:
self._deltas[timestamp][tag] = np.nan
self._cache[timestamp][tag] = None
sorted_timestamps = sorted(self._cache.keys())
for timestamp in updated:
previous_timestamp_index = sorted_timestamps.index(timestamp) - 1
if previous_timestamp_index >= 0:
previous_timestamp = sorted_timestamps[previous_timestamp_index]
else:
# looks odd, but it values will be nan and cause a nan delta
previous_timestamp = timestamp
for tag in data[timestamp]:
if hasattr(data[timestamp][tag], '__iter__'):
# case where there is either a list of lists of
# single values per tag or a list of single values
# per tag
values = []
for value_index, value in enumerate(data[timestamp][tag]):
if hasattr(value, '__iter__'):
# there is a list of lists of single values per tag
if self._cache[previous_timestamp][tag] == None:
self._cache[previous_timestamp][tag] = [[np.nan] * len(data[timestamp][tag][value_index])] * len(data[timestamp][tag])
subvalues = []
for subvalue_index,subvalue in enumerate(value):
if hasattr(self._cache[previous_timestamp][tag],'__iter__') and \
subvalue_index < len(self._cache[previous_timestamp][tag][value_index]):
subvalue_previous = self._cache[previous_timestamp][tag][value_index][subvalue_index]
if pd.notnull(subvalue_previous):
if subvalue >= subvalue_previous:
subvalues.append(subvalue - subvalue_previous)
else:
subvalues.append(np.nan)
else:
subvalues.append(np.nan)
else:
subvalues.append(np.nan)
values.append(subvalues)
else:
# there is a list of single values per tag
if self._cache[previous_timestamp][tag] == None:
self._cache[previous_timestamp][tag] = [np.nan] * len(data[timestamp][tag])
value_previous = self._cache[previous_timestamp][tag][value_index]
if pd.notnull(value_previous):
if value >= value_previous:
values.append(value - value_previous)
else:
values.append(np.nan)
else:
values.append(np.nan)
# update the previous time data
self._cache[timestamp][tag] = data[timestamp][tag]
self._deltas[timestamp][tag] = values
else:
# case where there is a single value per tag
previous = self._cache[previous_timestamp][tag]
# test if we have a previous value and if so does
# it match the current value structure
# (i.e. single value or list if values)
if previous != None and \
hasattr(previous, '__iter__') == hasattr(data[timestamp][tag], '__iter__') and \
pd.notnull(previous):
if data[timestamp][tag] >= previous:
value = data[timestamp][tag] - previous
else:
value = np.nan
else:
value = np.nan
self._cache[timestamp][tag] = data[timestamp][tag]
self._deltas[timestamp][tag] = value
# remove values in cache and previous data no longer available
# in subject - do after loading new cache otherwise you will
# drop previous value outside the subject cache that may be
# required to calculate all deltas in the current cache window
for purge in set(self._deltas.keys()).difference(data.keys()):
del self._deltas[purge]
del self._cache[purge]
self._updated = updated
self.notify_observers()
self._updated = set()
[docs] def state(self):
"""Gets the state.
Returns:
dict: A dict of with timestamp (int) keys and dict
values. Where each dict value has tag name (str) keys and
the delta value. Depending on the subject, that can be a
single value, a list of values or a list of list of
values.
"""
return self._deltas,self._updated