#
# Copyright (c) 2016-2018 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Adjacent Link LLC nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import sys
import numpy as np
import pandas as pd
import threading
import collections
from observer import Observer
class ModelError(Exception):
pass
[docs]class Model(Observer):
def __init__(self,stream,*subjects,**kwargs):
"""Creates a data model.
Args:
stream (obj): Stream associated with the model.
subjects (Subject): Tuple of variables (with or without
modifiers) used to populate the model.
Keyword Args:
by_tag (bool): Indicate if the model should using tags as
the index or timestamps. Default: False.
labels ([str]): Names of the model columns. If empty,
column default names are made using measurement and
attribute names. Default: [].
purge (bool): Indicate if the model should purge values
that are no longer cached by subjects. Default: True.
Raises:
KeyError: If an invalid keyword is found.
Example::
model = stream.model(Delta(Transform(varRxSlotStatusTable,
TableColumns(3))),
Delta(Transform(varRxSlotStatusTable,
TableColumns(4,5,6,7,8))),
Delta(Transform(varTxSlotStatusTable,
TableColumns(3))),
Delta(Transform(varTxSlotStatusTable,
TableColumns(4,5))),
labels=['Rx Slot Success',
'Rx Slot Error',
'Tx Slot Success',
'Tx Slot Error'],
by_tag=True)
"""
Observer.__init__(self)
self._stream = stream
self._subjects = subjects
self._index = {y:x for x,y in enumerate(subjects)}
self._cache = {}
self._df = pd.DataFrame()
self._generate = False
self._events = 0
self._lock = threading.Lock()
self._tags = set()
self._labels = kwargs.pop('labels',None)
self._by_tag = kwargs.pop('by_tag',False)
self._purge = kwargs.pop('purge',True)
if kwargs:
raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
if self._labels != None and len(self._labels) != len(subjects):
print >>sys.stderr,"Invalid labels keyword argument using default labels"
self._labels = None
for subject in subjects:
if subject.stream() != self._stream:
raise ModelError('model subject %s from a different stream' % stream.name())
subject.attach(self)
[docs] def stream(self):
"""Gets the stream associated with the model.
Returns:
:class:`~otestpoint.labtools.Stream`
"""
return self._stream
[docs] def by_tag(self):
"""Indicates if the model is using tags as the index or timestamps
Returns:
bool: True for indexed by tag, False otherwise.
"""
return self._by_tag
def notify(self,subject):
data,updated = subject.state()
self._lock.acquire()
if not data and not updated:
self._cache.clear()
self._df = pd.DataFrame()
self._tags.clear()
self._events += 1
self._generate = True
self._lock.release()
return
# remove values in cache no longer available in subject
if self._purge:
for purge in set(self._cache.keys()).difference(data.keys()):
del self._cache[purge]
# update tags and existing cache with nan for any new tag entries
if self._by_tag:
for tags in data.values():
for tag in tags:
if tag not in self._tags:
# update existing cache with np.nan subject values
for timestamp in self._cache:
self._cache[timestamp][tag] = [np.nan] * len(self._index)
# update tags
self._tags.update(set(tags.keys()))
# update cache to include np.nan data for new timestamps
for timestamp in updated:
# by_tag model keeps node data seperate
if self._by_tag:
if timestamp not in self._cache:
self._cache[timestamp] = {}
for tag in self._tags:
if tag not in self._cache[timestamp]:
self._cache[timestamp][tag] = [np.nan] * len(self._index)
else:
if timestamp not in self._cache:
self._cache[timestamp] = [np.nan] * len(self._index)
# update cache data associated with updated data
for timestamp in updated:
if self._by_tag:
for tag in data[timestamp]:
if tag not in self._cache[timestamp]:
self._cache[timestamp][tag] = [np.nan] * len(self._index)
self._cache[timestamp][tag][self._index[subject]] = data[timestamp][tag]
else:
self._cache[timestamp][self._index[subject]] = data[timestamp]
# increment event indicator to tell observers new measurements
# have been received
self._events += 1
# set the generate flag so a future call to data() will update
# the cached dataframe.
self._generate = True
self._lock.release()
[docs] def data(self,**kwargs):
"""Gets a dataframe of current measurements.
Keyword Args:
index (int): Numerical index of column to use as the
dataframe index or None for timestamps. Default None.
join (str): Type of join to use for non-timeseries
dataframes. One of: 'left','right','outer' or 'inner.'
Default: 'outer.'
apply (str): Callable applied to dataframe prior to
creating or None. Default: None.
ts (bool): Indicates if dataframe is a
timeseries. Default: True.
Returns:
tuple: ``(dataframe,event count, (timestamp min, timestamp max))``
Where the `event count` (int) indicates the index
(count) of the current model that was used to generate
the dataframe. The `event count` increases with every
received measurement. This allows for determining if
the resulting dataframe is potentially different from
the previous dataframe. The `timestamp min` (int) and
`timestamp max` (int) values indicate the current
measurement window in seconds since the epoch.
Raises:
KeyError: If an invalid keyword is found.
ValueError: If a keyword value is unallowed.
ModelError: If a timeseries dataframe is requested from a
model not organized by timestamp.
Example::
import time
import pandas as pd
event_prev = 0
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
while True:
df,event_cur,(_,timestamp) = model.data(ts=False,
index=None)
if event_cur != event_prev:
print df
event_prev = event_cur
time.sleep(1)
"""
# process keyword arguments
kw_index = kwargs.pop('index',None)
kw_join = kwargs.pop('join','outer')
kw_apply = kwargs.pop('apply',None)
if 'time_series' in kwargs:
kw_time_series = kwargs.pop('time_series')
else:
kw_time_series = kwargs.pop('ts',True)
# validate keyword arguments
if kwargs:
raise KeyError("Unknown key(s): %s" % ", ".join(kwargs.keys()))
if kw_join not in ['left','right','outer','inner']:
raise ValueError("join must be: 'left','right','outer' or 'inner'.")
if kw_index != None and (kw_index > len(self._subjects) or kw_index < 0):
raise ValueError("For this model instance index must be: None or [0,%d]." % (len(self._subjects) - 1))
if not kw_time_series and not self._by_tag:
raise ModelError("Non-Time Series data must use 'by_tag' model.")
events = 0
timestamps = []
if self._cache:
timestamp_min = min(self._cache.keys())
timestamp_max = max(self._cache.keys())
else:
timestamp_min = 0
timestamp_max = 0
self._lock.acquire()
if self._generate:
self._df = pd.DataFrame()
if kw_time_series:
if self._by_tag:
measurements = {}
for tag in sorted(self._tags):
measurements[tag] = {x.name() : [] for x in self._subjects}
if self._cache:
for timestamp in sorted(self._cache.keys()):
timestamps.append(timestamp)
for index,subject in enumerate(self._subjects):
for tag in sorted(self._tags):
if tag in self._cache[timestamp]:
measurements[tag][subject.name()].append(self._cache[timestamp][tag][index])
else:
measurements[tag][subject.name()].append(np.nan)
items = [('Time',timestamps)]
for index,subject in enumerate(self._subjects):
for tag in sorted(self._tags):
label = '%s:%s' % (tag,self._labels[index] if self._labels != None else subject.name())
items.append((label,measurements[tag][subject.name()]))
self._df = pd.DataFrame.from_items(items)
self._generate = False
else:
measurements = {x.name() : [] for x in self._subjects}
if self._cache:
for timestamp in sorted(self._cache.keys()):
timestamps.append(timestamp)
for index,subject in enumerate(self._subjects):
measurements[subject.name()].append(self._cache[timestamp][index])
measurements['Time'] = timestamps
items = [('Time',timestamps)]
for index,subject in enumerate(self._subjects):
label = self._labels[index] if self._labels != None else subject.name()
items.append((label,measurements[subject.name()]))
self._df = pd.DataFrame.from_items(items)
self._generate = False
else:
if self._cache:
# not time series
labels = {}
index_data = None
for index,subject in enumerate(self._subjects):
labels[subject.name()] = self._labels[index] if self._labels != None else subject.name()
if kw_index != None:
dfs = []
# take one column a
for tag in sorted(self._cache[timestamp_max]):
items = []
for index,columns in enumerate(self._cache[timestamp_max][tag]):
if hasattr(columns,'__iter__'):
for column_index,column in enumerate(columns):
subject_name = self._subjects[index].name()
if kw_index == index:
items.insert(0,(labels[subject_name],column))
else:
label = labels[subject_name]
if len(columns) > 1:
if len(labels[subject_name]) == len(columns):
label = labels[subject_name][column_index]
else:
label = "%s:%d" % (labels[subject_name],column_index)
label = "%s:%s" % (tag,label)
items.append((label,column))
if items:
df = pd.DataFrame.from_items(items)
df.set_index(labels[self._subjects[kw_index].name()],
inplace=True)
dfs.append(df)
if dfs:
self._df = dfs[0]
if len(dfs) > 1:
self._df = self._df.join(dfs[1:],how=kw_join)
self._df.reset_index(inplace=True)
self._df.rename(columns={'index': labels[self._subjects[kw_index].name()]}, inplace=True)
else:
items = collections.OrderedDict()
items['Node'] = []
for subject in self._subjects:
items[labels[subject.name()]] = []
for tag in sorted(self._cache[timestamp_max].keys()):
items['Node'].append(tag)
for subject in self._subjects:
items[labels[subject.name()]].append(self._cache[timestamp_max][tag][self._index[subject]])
if items['Node']:
self._df = pd.DataFrame.from_dict(items)
df = self._df.copy()
events = self._events
self._lock.release()
if kw_apply != None:
return kw_apply(df,events,(timestamp_min,timestamp_max))
else:
return df,events,(timestamp_min,timestamp_max)