labtools Introduction¶
OpenTestPoint labtools converts measurement streams from one or more publishers into pandas DataFrames.
labtools can be used to monitor real-time experiments, orchestrate smart-scenario execution and perform post-process analysis. labtools has builtin visualization support using Matplotlib animations.
labtools can integrate with any application that can manipulate a DataFrame.
Streams - Subscribing to measurements¶
The first step to using labtools is to create a measurement Stream:
from otestpoint.labtools import Stream
stream = Stream('localhost:9002',
'IPRoute.Routing.Tables.IPv4',
'EMANE.PhysicalLayer.Tables.Events')
A Stream()
requires an IPv4 or IPv6
publisher endpoint and one or more measurement names in OpenTestPoint
Probe Name Format. A stream instance is used to create the
variables which populate entries within a data model.
A typical OpenTestPoint deployment uses one or more brokers to logically group together nodes/platforms that publish measurements. Examples of logical groupings include all nodes in an experiment, nodes group by communication technology, mobile nodes grouped by activity/mission, etc.
Variables - Selecting measurement attributes¶
Once you have established which publisher (or brokered publishers) and
measurements to subscribe to using a
Stream
, you need to create one or more
Variable
instances in order to
process specific measurement attributes:
varTotalRoutes = stream.variable('Measurement_iproute_routing_tables_ipv4',
'routingtable',
apply=total_routes)
varPhyEventReceptionTable = stream.variable('Measurement_emane_physicallayer_tables_events',
'eventreceptiontable')
In the above example, two variables are created to represent the total routes and the EMANE Physical Layer event reception histogram, respectively. The Measurement_iproute_routing_tables_ipv4 measurement is produced using the iproute probe and the Measurement_emane_physicallayer_tables_events measurement is produced using the emane probe.
All variables store measurements per timestamp per tag which means one
variable represents all measurements received for all publishers
(tags) for the specified measurement and attribute name. The number of
measurements stored per publisher can be configured using the cache
Stream
keyword argument. OpenTestPoint
probes publish measurements on the same time boundary at a configured
rate (default 5 seconds), providing a correlated tidy data
set. Measurements not received from one or more tags (publishers) are
set to np.nan.
Note
A tag is the unique name that an OpenTestPoint publisher appends to the end of a probe name. Tags often correspond in some fashion to a node/platform name. For example, the measurement probe name EMANE.PhysicalLayer.Tables.Events.radio-1 has a tag of radio-1.
Models - Building tidy data sets¶
A data model is created using one or more
Variable
instances that may be
combined, modified or reduced using a series of modifiers:
def pathloss_event_count(msg):
for entry in msg.entries:
if entry.event == 101:
return entry.count
def location_event_count(msg):
for entry in msg.entries:
if entry.event == 100:
return entry.count
def antenna_event_count(msg):
for entry in msg.entries:
if entry.event == 102:
return entry.count
model = stream.model(varTotalRoutes,
Delta(Transform(varPhyEventReceptionTable,
pathloss_event_count)),
Delta(Transform(varPhyEventReceptionTable,
location_event_count)),
Delta(Transform(varPhyEventReceptionTable,
antenna_event_count)),
labels=['Number Routes',
'Pathloss Events',
'Location Events',
'Antenna Events'],
by_tag=True)
An otestpoint.labtools.Stream.model()
takes one or more modified
or non-modified variables and provides a set of keyword arguments to
control how the data model manages the variables.
labtools contains the following builtin modifiers:
A measurement modifier that produces the delta measurement between successive measurements per tag or np.nan if no previous measurement is available.
A measurement modifier that sums one or more measurements for all tags for the same timestamp. Sum will be np.nan if any tag is missing a measurement for a given timestamp.
A measurement modified that executes a callable per tag per timestamp.
A measurement modifier that produces the unique values of one or more measurements for all tags for the same timestamp. Unique will be be np.nan if any tag is missing a measurement for a given timestamp.
Additional modifiers can be created by deriving a class from
Subject
and
Observer
. Modifiers need to
provide implementation for the following methods:
name()
Returns the modifier name. Usually a function of the modifier’s function and the name of the modified subjects (variables or other modifiers).
state()
Returns the modified measurements.
notify()
Called to update the modifier with new measurements.
The Model
instance’s
data()
method is used to obtain
a DataFrame representation of the model state (current
measurements). Along with the DataFrame, an event counter is returned
to indicate how many measurements were received by the stream instance
associated with the model. This can be used as an indication of
whether the DataFrame represents changes in the model since the last
query. Additionally, two timestamp counters representing the current
model time window (min/max) as seconds since the epoch are returned.
Once you have a DataFrame you are free to use it any way you would like:
stream.run()
event_prev = 0
df_prev = pd.DataFrame()
redraw = False
def handler(signum, frame):
global redraw
redraw = True
signal.signal(signal.SIGWINCH, handler)
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
while True:
df,event_cur,(_,timestamp) = model.data(ts=False,
index=0,
join='outer')
if event_cur != event_prev or redraw:
# determine names of current nodes
names = set()
for name in list(df):
m = re.match('([^\d]+-(\d+)):Frequency',name)
if m:
names.add(m.group(1))
nodes = [int(x[5:-10]) for x in list(df)[1:]]
df.columns = ['Index'] + nodes
df = df[sorted(nodes)]
time_str = time.strftime('%a, %d %b %Y %H:%M:%S',time.localtime(timestamp))
if not df.equals(df_prev) or redraw:
print "\x1b[2J\x1b[H",time_str
def color_2Ghz(val):
if val == 2400000000:
return '\x1b[30;1m%d\x1b[0m' % val
else:
return '\x1b[31;1m%d\x1b[0m' % val
if not df.empty:
df.columns = ['\x1b[30;1m%s\x1b[0m' % str(x) for x in list(df)]
df = df.applymap(color_2Ghz)
print df
df_prev = df
redraw = False
else:
print "\x1b[H\x1b[K",time_str
event_prev = event_cur
time.sleep(1)
Note
The above code snippet is taken from a monitor script that displays a table of per node Rx frequency slot assignments. The monitor script is used to highlight a TDMA schedule error encountered during EMANE tutorial demo 8.

Views - Visualizing data models¶
labtools provides builtin views to aid in visualizing DataFrames
produced from a Model
instance:
- Line: A line view of one or more measurements over time.
- Heatmap: A heatmap view of one measurement over time.
- Bar Chart: A per tag (node/publisher) view of one or more measurements for the latest timestamp.
A View
instance is used to create a view
containing one or more plots. A View
instance is created with a data model, a kind which is one of
line, heat or bar; and any keywords with a leading model_
prepended that are allowed in Model.data()
.
Note
At present it is not possible to mix plot types: line, heatmap or bar within a single view. To achieve the same effect, you must use a FigureCanvasTkAgg instance and the View.plot()
and View.animate()
interface.
Line¶
A Line.Plot
instance is
used to create a strip chart like measurement visualization. Line views
support markers and indicators to allow communication of event
occurrence and condition satisfaction:
- markers: One or more DataFrame columns to use as markers. A marker is a vertical line that indicates an event occurred. A marker is drawn any time the indicated DataFrame column value is non-zero and non-nan.
- indicators: One or more DataFrame columns to use as indicators. An indicator is a horizontal bar that indicates a condition is met. An indicator is continuously drawn for time stamps where the indicated DataFrame column value is non-zero and non-nan.
view = View(model,
kind='line',
title='Network Traffic Load',
model_apply=combine)
view.show(View.Plot('User Tx Bytes',
'Control Tx Bytes',
title='Tx Traffic',
ylabel='Bytes',
yticks=range(0,250001,50000),
legend=True),
View.Plot('User Rx Bytes',
'Control Rx Bytes',
title='Rx Traffic',
ylabel='Bytes',
yticks=range(0,500001,50000),
legend=True,
markers=('Pathloss Events',
'Location Events'),
indicators='Converged'))

Heatmap¶
A Heatmap.Plot
instance is used to
create a colorbar chart of a measurment. Heatmap views support
markers and indicators to allow communication of event occurrence
and condition satisfaction
view = View(model,
kind='heat',
title='TDMA Slot Error')
view.show(View.Plot(*['node-%s:Rx Slot Error' % x for x in range(1,11)],
title='Rx Slot Errors',
ylim=(0,10),
markers=['node-%s:Pathloss Events' % x for x in range(1,11)]),
View.Plot(*['node-%s:Tx Slot Error' % x for x in range(1,11)],
title='Tx Slot Errors',
ylim=(0,10),
markers=['node-%s:Pathloss Events' % x for x in range(1,11)]))

Bar¶
A Bar.Plot
instance
is used to create a bar chart of latest values of one of one or more
measurements.
view = View(model,
kind='bar',
title='TDMA Slot Activity',
model_ts=False,
model_index=None)
view.show(View.Plot('Node',
'Rx Slot Error',
'Rx Slot Success',
title='Rx Slot Status',
ylabel='Count',
yticks=range(0,101,10),
legend=True),
View.Plot('Node',
'Tx Slot Error',
'Tx Slot Success',
title='Tx Slot Status',
ylabel='Count',
yticks=range(0,501,100),
legend=True))
