#
# Copyright (c) 2016-2017 - Adjacent Link LLC, Bridgewater, New Jersey
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Adjacent Link LLC nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
import pandas as pd
import time
class Bar(object):
[docs] class Plot(object):
def __init__(self,*columns,**kwargs):
"""Creates a Bar Plot.
A bar plot is a plot of the latest timestamp data for one
or more measurements.
Args:
columns ([str]): One or more DataFrame column names
to plot.
Keyword Args:
title (str): Plot title. Default: ''.
yticks ([int]): Plot y-axis tick. Default: None.
ylabel (str): Plot y-axis label. Default: ''.
xlabel (str): Plot x-axis label. Default: ''.
xtick_rotation (int): Percentage of rotation to apply
to xtick labels. Default: 70.
legend (bool): Flag indicating whether a plot legend
should be displayed. Default: False.
Raises:
KeyError: If an invalid keyword is found.
"""
self.title = kwargs.pop('title','')
self.yticks = kwargs.pop('yticks',None)
self.ylabel = kwargs.pop('ylabel','')
self.xlabel = kwargs.pop('xlabel','')
self.legend = kwargs.pop('legend',False)
self.xtick_rotation = kwargs.pop('xtick_rotation',70)
self.xcolumn = columns[0]
self.columns = columns[1:]
if kwargs:
raise KeyError("%s unknown key(s): %s" % (self.__class__.__name__,
", ".join(kwargs.keys())))
def __init__(self,model,**kwargs):
self._model = model
self._title = kwargs.pop('title','')
self._interval = kwargs.pop('interval',1000)
self._events = 0
self._plots = []
self._model_data_kwargs = {}
self._last_update_time = None
for kwarg in kwargs.keys():
if kwarg.startswith('model_'):
self._model_data_kwargs[kwarg[6:]] = kwargs.pop(kwarg)
if kwargs:
raise KeyError("%s unknown key(s): %s" % (self.__class__.__name__,
", ".join(kwargs.keys())))
def plot(self,*plots,**kwargs):
plots = [Bar.Plot(*p.columns,**p.kwargs) for p in plots]
self._fig = kwargs.pop('figure',None)
tight = kwargs.pop('tight',False)
self._stale_timeout = kwargs.pop('stale_timeout',10)
if kwargs:
raise KeyError("%s unknown key(s): %s" % (self.__class__.__name__,
", ".join(kwargs.keys())))
if self._fig == None:
self._fig = plt.gcf()
self._fig.canvas.set_window_title(self._title)
self._fig.patch.set_facecolor('white')
plt.style.use('ggplot')
self._fig_face_color = self._fig.patch.get_facecolor()
# indicate no updates yet
self._fig.patch.set_facecolor('#ff8080')
class PlotEntry(object):
def __init__(self):
self.columns = []
self.rects = []
self.ax = None
self.attrib = None
self.initialize = False
self.last_xcolumn = []
for index,plot in enumerate(plots):
plot_entry = PlotEntry()
plot_entry.ax = self._fig.add_subplot(len(plots),1,index+1)
for column in plot.columns:
plot_entry.columns.append(column)
plot_entry.rects.append([])
plot_entry.ax.set_ylabel(plot.ylabel)
plot_entry.ax.set_xlabel(plot.xlabel)
plot_entry.ax.yaxis.set_ticks(plot.yticks)
plot_entry.ax.set_title(plot.title)
plot_entry.attrib = plot
self._plots.append(plot_entry)
if tight:
self._fig.tight_layout()
def animate(self):
def _init():
return []
def _animate(i):
df,events_cur,_ = self._model.data(**self._model_data_kwargs)
if self._events != events_cur:
if self._last_update_time == None:
self._fig.patch.set_facecolor(self._fig_face_color)
self._fig.canvas.draw()
self._last_update_time = time.time()
else:
if self._last_update_time != None and \
time.time() - self._last_update_time > self._stale_timeout:
self._fig.patch.set_facecolor('#ff8080')
self._fig.canvas.draw()
self._last_update_time = None
# canvas redraw for label artist required if any xtick labels change
redraw_canvas = False
if not df.empty and self._events != events_cur:
for plot_entry in self._plots:
if plot_entry.attrib.xcolumn in list(df) and \
plot_entry.last_xcolumn != list(df[plot_entry.attrib.xcolumn]):
if plot_entry.last_xcolumn:
for index,column in enumerate(plot_entry.columns):
for rect in plot_entry.rects[index]:
rect.remove()
xticks = list(df[plot_entry.attrib.xcolumn])
for index,column in enumerate(plot_entry.columns):
width = 0.8
plot_entry.rects[index] = plot_entry.ax.bar([width * len(plot_entry.columns) * x + width * index + x for x in range(0,len(xticks))],
[0] * len(xticks),
width,
color=plt.rcParams['axes.color_cycle'][index],
label=column,
animated=True)
plot_entry.ax.set_xticks([width * len(plot_entry.columns) * x + (width / 2.0) * len(plot_entry.columns) + x for x in range(0,len(xticks))])
plot_entry.ax.set_xticklabels(xticks, rotation=plot_entry.attrib.xtick_rotation)
plot_entry.ax.yaxis.set_ticks(plot_entry.attrib.yticks)
# remove spurious duplicate legend entries
if plot_entry.attrib.legend:
handles,labels= plot_entry.ax.get_legend_handles_labels()
label_set = set()
dup = False
for index,label in enumerate(labels):
if label not in label_set:
label_set.add(label)
else:
dup = True
break
if dup:
plot_entry.ax.legend(handles[:index],labels[:index])
else:
plot_entry.ax.legend()
plot_entry.ax.redraw_in_frame()
plot_entry.last_xcolumn = xticks
plot_entry.initialize = True
redraw_canvas = True
if redraw_canvas:
self._fig.canvas.draw()
for plot_entry in self._plots:
for column,rects in zip(plot_entry.columns,plot_entry.rects):
if column in list(df):
for rect, yi in zip(rects, list(df[column])):
rect.set_height(yi)
self._events = events_cur
elif df.empty and self._events != events_cur:
# reset
for plot_entry in self._plots:
for rect in plot_entry.rects:
for x in rect:
x.set_height(np.nan)
self._events = events_cur
modded = []
for plot_entry in self._plots:
for rect in plot_entry.rects:
for x in rect:
modded.append(x)
return modded
return animation.FuncAnimation(self._fig,
_animate,
init_func=_init,
interval=self._interval,
blit=True)
def show(self,*plots,**kwargs):
self.plot(*plots,**kwargs)
ani = self.animate()
if len(self._plots) > 1:
self._fig.tight_layout()
plt.show()