EMANE  1.0.1
otamanager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2016 - Adjacent Link LLC, Bridgewater, New Jersey
3  * Copyright (c) 2008-2012 - DRS CenGen, LLC, Columbia, Maryland
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above copyright
13  * notice, this list of conditions and the following disclaimer in
14  * the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of DRS CenGen, LLC nor the names of its
17  * contributors may be used to endorse or promote products derived
18  * from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "otamanager.h"
35 #include "otauser.h"
36 #include "logservice.h"
38 #include "otaexception.h"
39 #include "eventservice.h"
40 #include "otaheader.pb.h"
41 #include "event.pb.h"
42 #include "socketexception.h"
43 
44 #include "emane/net.h"
48 
49 #include <sstream>
50 #include <algorithm>
51 #include <uuid.h>
52 
54  bOpen_(false),
55  eventStatisticPublisher_{"OTAChannel"},
56  u64SequenceNumber_{}
57 {
58  uuid_clear(uuid_);
59 }
60 
62 {
63  if(bOpen_)
64  {
65  ThreadUtils::cancel(thread_);
66 
67  thread_.join();
68  }
69 }
70 
72 {
73  otaStatisticPublisher_.setRowLimit(rows);
74 }
75 
77 {
78  eventStatisticPublisher_.setRowLimit(rows);
79 }
80 
82  const DownstreamPacket & pkt,
83  const ControlMessages & msgs) const
84 {
85  // get the pkt info
86  const PacketInfo & pktInfo{pkt.getPacketInfo()};
87 
88  // set of optional additional transmitters (AT)
89  Controls::OTATransmitters otaTransmitters{};
90 
91  auto eventSerializations = pkt.getEventSerializations();
92 
93  EMANEMessage::Event::Data data;
94 
95  if(!eventSerializations.empty())
96  {
97  NEMId targetNEMId;
98  EventId eventId;
99  Serialization serialization;
100 
101  for(const auto & entry : eventSerializations)
102  {
103  std::tie(targetNEMId,
104  eventId,
105  serialization) = entry;
106 
107  // process any local event
109  eventId,
110  serialization,
111  id);
112 
113  if(bOpen_)
114  {
115  auto pSerialization = data.add_serializations();
116 
117  pSerialization->set_nemid(targetNEMId);
118 
119  pSerialization->set_eventid(eventId);
120 
121  pSerialization->set_data(serialization);
122  }
123  }
124  }
125 
126  for(const auto & pMessage : msgs)
127  {
129  {
130  const auto pTransmitterControlMessage =
131  reinterpret_cast<const Controls::OTATransmitterControlMessage *>(pMessage);
132 
133  otaTransmitters = pTransmitterControlMessage->getOTATransmitters();
134  }
135  }
136 
137  /*
138  * UpstreamPacket data is shared (reference counted). The same
139  * packet can be used in multiple calls to OTAUser::processOTAPacket
140  * since the resulting action is to enqueue a referenced counted
141  * copy on each NEM queue. Each copy will share the same packet data
142  * but have unique index counters used for stripping packet data.
143  */
144  if(nemUserMap_.size() > 1)
145  {
146  auto now = Clock::now();
147 
148  UpstreamPacket upstreamPacket({pktInfo.getSource(),
149  pktInfo.getDestination(),
150  pktInfo.getPriority(),
151  now,
152  uuid_},
153  pkt.getVectorIO());
154 
155  // bounce a copy of the pkt back up to our local NEM stack(s)
156  for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
157  iter != end;
158  ++iter)
159  {
160  if(iter->first == id)
161  {
162  // skip our own transmisstion
163  }
164  else if(otaTransmitters.count(iter->first) > 0)
165  {
166  // skip NEM(s) in the additional transmitter set (ATS)
167  }
168  else
169  {
170  iter->second->processOTAPacket(upstreamPacket,ControlMessages());
171  }
172  }
173  }
174 
175  // send the packet to additional OTAManagers using OTA multicast transport
176  if(bOpen_)
177  {
178  std::string sEventSerialization{};
179 
180  if(!eventSerializations.empty())
181  {
182  if(!data.SerializeToString(&sEventSerialization))
183  {
185  ERROR_LEVEL,
186  "OTAManager sendOTAPacket unable to serialize attached event data src:%hu dst:%hu",
187  pktInfo.getSource(),
188  pktInfo.getDestination());
189  }
190  }
191 
192  ControlMessageSerializer controlMessageSerializer{msgs};
193 
194  // create an ota message to carry the packet_info, and variable ctrl data len only
195  // total message length with data payload is defined below
196  EMANEMessage::OTAHeader otaheader;
197 
198  otaheader.set_source(pktInfo.getSource());
199  otaheader.set_destination(pktInfo.getDestination());
200  otaheader.set_datalength(pkt.length());
201  otaheader.set_controllength(controlMessageSerializer.getLength());
202  otaheader.set_eventlength(sEventSerialization.size());
203  otaheader.set_sequencenumber(++u64SequenceNumber_);
204  otaheader.set_uuid(reinterpret_cast<const char *>(uuid_),sizeof(uuid_));
205 
206  std::string sOTAHeader{};
207 
208  if(otaheader.SerializeToString(&sOTAHeader))
209  {
210  std::uint16_t u16HeaderLength = HTONS(sOTAHeader.size());
211 
212  Utils::VectorIO vectorIO{{reinterpret_cast<char *>(&u16HeaderLength),sizeof(u16HeaderLength)},
213  {const_cast<char *>(sOTAHeader.c_str()),sOTAHeader.size()}};
214 
215  if(!sEventSerialization.empty())
216  {
217  vectorIO.push_back({const_cast<char *>(sEventSerialization.c_str()),sEventSerialization.size()});
218  }
219 
220  const auto & controlMessageIO = controlMessageSerializer.getVectorIO();
221 
222  vectorIO.insert(vectorIO.end(),controlMessageIO.begin(),controlMessageIO.end());
223 
224  const auto & packetIO = pkt.getVectorIO();
225 
226  vectorIO.insert(vectorIO.end(),packetIO.begin(),packetIO.end());
227 
228  // gather and send
229  if(mcast_.send(&vectorIO[0],static_cast<int>(vectorIO.size())) == -1)
230  {
232  ERROR_LEVEL,
233  "OTAManager sendOTAPacket unable to send ctrl_len:%zu, payload_len:%zu src:%hu dst:%hu reason:%s\n",
234  controlMessageSerializer.getLength(),
235  pkt.length(),
236  pktInfo.getSource(),
237  pktInfo.getDestination(),
238  strerror(errno));
239 
240  }
241  else
242  {
244  uuid_,
245  pktInfo.getSource());
246 
247 
248  for(const auto & entry : eventSerializations)
249  {
250  eventStatisticPublisher_.update(EventStatisticPublisher::Type::TYPE_TX,
251  uuid_,
252  std::get<1>(entry));
253  }
254  }
255  }
256  else
257  {
259  ERROR_LEVEL,
260  "OTAManager sendOTAPacket unable to serialize OTA header src:%hu dst:%hu",
261  pktInfo.getSource(),
262  pktInfo.getDestination());
263  }
264  }
265 
266  // clean up control messages
267  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
268 }
269 
271 {
272  std::pair<NEMUserMap::iterator, bool> ret;
273 
274  if(nemUserMap_.insert(std::make_pair(id,pOTAUser)).second == false)
275  {
276  std::stringstream ssDescription;
277  ssDescription<<"attempted to register duplicate user with id "<<id<<std::ends;
278  throw OTAException(ssDescription.str());
279  }
280 }
281 
283 {
284  if(nemUserMap_.erase(id) == 0)
285  {
286  std::stringstream ssDescription;
287  ssDescription<<"attempted to unregister unknown user with id "<<id<<std::ends;
288  throw OTAException(ssDescription.str());
289  }
290 }
291 
292 void EMANE::OTAManager::open(const INETAddr & otaGroupAddress,
293  const std::string & otaManagerDevice,
294  bool bLoopback,
295  int iTTL,
296  const uuid_t & uuid)
297 {
298  otaGroupAddress_ = otaGroupAddress;
299 
300  uuid_copy(uuid_,uuid);
301 
302  try
303  {
304  mcast_.open(otaGroupAddress,true,otaManagerDevice,iTTL,bLoopback);
305  }
306  catch(SocketException & exp)
307  {
308  std::stringstream sstream;
309 
310  sstream<<"Platform OTA Manager: Unable to open OTA Manager socket: '"
311  <<otaGroupAddress.str()
312  <<"'."
313  <<std::endl
314  <<std::endl
315  <<"Possible reason(s):"
316  <<std::endl
317  <<" * No Multicast device specified and routing table nondeterministic"
318  <<std::endl
319  <<" (no multicast route and no default route)."
320  <<std::endl
321  <<" * Multicast device "
322  <<otaManagerDevice
323  <<" does not exist or is not up."
324  <<std::endl
325  <<exp.what()
326  <<std::ends;
327 
328  throw OTAException(sstream.str());
329  }
330 
331  thread_ = std::thread{&EMANE::OTAManager::processOTAMessage,this};
332 
333  if(ThreadUtils::elevate(thread_))
334  {
336  ERROR_LEVEL,"OTAManager::open: Unable to set Real Time Priority");
337  }
338 
339  bOpen_ = true;
340 }
341 
342 
343 void EMANE::OTAManager::processOTAMessage()
344 {
345  unsigned char buf[65536];
346 
347  ssize_t len = 0;
348 
349  while(1)
350  {
351  if((len = mcast_.recv(buf,sizeof(buf),0)) > 0)
352  {
353  auto now = Clock::now();
354 
355  // ota message len sanity check
356  if(static_cast<size_t>(len) >= sizeof(std::uint16_t))
357  {
358  std::uint16_t * pu16OTAHeaderLength{reinterpret_cast<std::uint16_t *>(buf)};
359 
360  *pu16OTAHeaderLength = NTOHS(*pu16OTAHeaderLength);
361 
362  len -= sizeof(std::uint16_t);
363 
364  EMANEMessage::OTAHeader otaHeader;
365 
366  if(static_cast<size_t>(len) >= *pu16OTAHeaderLength &&
367  otaHeader.ParseFromArray(&buf[2], *pu16OTAHeaderLength))
368  {
369  if(static_cast<size_t>(len) ==
370  otaHeader.datalength() +
371  otaHeader.controllength() +
372  otaHeader.eventlength() +
373  *pu16OTAHeaderLength)
374  {
375  std::uint16_t u16EventIndex = 2 + *pu16OTAHeaderLength;
376  std::uint16_t u16ControlIndex = u16EventIndex + otaHeader.eventlength();
377  std::uint16_t u16PacketIndex = u16ControlIndex + otaHeader.controllength();
378 
379  uuid_t remoteUUID;
380  uuid_copy(remoteUUID,reinterpret_cast<const unsigned char *>(otaHeader.uuid().data()));
381 
382  if(uuid_compare(uuid_,remoteUUID))
383  {
384  if(otaHeader.eventlength())
385  {
386  EMANEMessage::Event::Data data;
387 
388  if(data.ParseFromArray(&buf[u16EventIndex],otaHeader.eventlength()))
389  {
390  for(const auto & serialization : data.serializations())
391  {
392  EventServiceSingleton::instance()->processEventMessage(serialization.nemid(),
393  serialization.eventid(),
394  serialization.data());
395 
396  eventStatisticPublisher_.update(EventStatisticPublisher::Type::TYPE_RX,
397  remoteUUID,
398  serialization.eventid());
399  }
400  }
401  else
402  {
404  ERROR_LEVEL,"OTAManager message events could not be deserialized");
405  }
406  }
407 
408  // create packet info from the ota data message
409  PacketInfo pktInfo(otaHeader.source(),
410  otaHeader.destination(),
411  0,
412  now,
413  remoteUUID);
414 
415  UpstreamPacket pkt(pktInfo,&buf[u16PacketIndex],otaHeader.datalength());
416 
417  Controls::OTATransmitters otaTransmitters;
418 
419  if(otaHeader.controllength())
420  {
421  ControlMessages msgs =
422  ControlMessageSerializer::create(&buf[u16ControlIndex],
423  otaHeader.controllength());
424 
425  for(ControlMessages::const_iterator iter = msgs.begin(),end = msgs.end();
426  iter != end;
427  ++iter)
428  {
429  if((*iter)->getId() == Controls::SerializedControlMessage::IDENTIFIER)
430  {
431  auto pSerializedControlMessage =
432  static_cast<const Controls::SerializedControlMessage *>(*iter);
433 
434  if(pSerializedControlMessage->getSerializedId() ==
436  {
437  std::unique_ptr<Controls::OTATransmitterControlMessage>
438  pOTATransmitterControlMessage(Controls::OTATransmitterControlMessage::
439  create(pSerializedControlMessage->getSerialization()));
440 
441  otaTransmitters = pOTATransmitterControlMessage->getOTATransmitters();
442  }
443 
444  }
445 
446  // delete all control messages
447  delete *iter;
448  }
449  }
450 
451  otaStatisticPublisher_.update(OTAStatisticPublisher::Type::TYPE_UPSTREAM,
452  remoteUUID,
453  pktInfo.getSource());
454 
455  // for each local NEM stack
456  for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
457  iter != end; ++iter)
458  {
459  // only send pkt up to NEM(s) NOT in the ATS
460  if(otaTransmitters.count(iter->first) == 0)
461  {
462  iter->second->processOTAPacket(pkt,ControlMessages());
463  }
464  }
465 
466  }
467  }
468  else
469  {
471  ERROR_LEVEL,
472  "OTAManager Packet received data length incorrect"
473  " len: %zd header:%hu data:%u control: %u event: %u ",
474  len,
475  *pu16OTAHeaderLength,
476  otaHeader.datalength(),
477  otaHeader.controllength(),
478  otaHeader.eventlength());
479  }
480  }
481  else
482  {
484  ERROR_LEVEL,
485  "OTAManager message header could not be deserialized");
486  }
487  }
488  else
489  {
491  ERROR_LEVEL,
492  "OTAManager message missing header missing prefix length encoding");
493  }
494  }
495  else
496  {
498  ERROR_LEVEL,
499  "OTAManager Packet Received error");
500  break;
501  }
502  }
503 }
ssize_t recv(void *buf, size_t len, int flags=0)
std::set< NEMId > OTATransmitters
std::string Serialization
Definition: serializable.h:42
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
const PacketInfo & getPacketInfo() const
A Serialized Control Message is used to encapsulate Serializable control messages as they traverse pr...
void registerOTAUser(NEMId id, OTAUser *pOTAUser) override
Definition: otamanager.cc:270
void sendOTAPacket(NEMId id, const DownstreamPacket &pkt, const ControlMessages &msgs) const override
Definition: otamanager.cc:81
std::list< const ControlMessage * > ControlMessages
Exception thrown when registering or unregistering OTAUsers.
Definition: otaexception.h:46
std::vector< iovec > VectorIO
Definition: vectorio.h:43
std::uint16_t EventId
Definition: types.h:53
void update(Type type, const uuid_t &uuid, NEMId nemId)
Store source, destination, creation time and priority information for a packet.
Definition: packetinfo.h:50
static ControlMessages create(const void *pData, size_t length)
ssize_t send(const iovec *iov, int iovcnt, int flags=0) const
const char * what() const
Definition: exception.h:62
void update(Type type, const uuid_t &uuid, EventId eventId)
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::string str(bool bWithPort=true) const
Definition: inetaddr.cc:409
int cancel(std::thread &thread)
Definition: threadutils.h:65
std::uint16_t NEMId
Definition: types.h:52
The OTA Transmitter Control Message is by the emulator physical layer to specify the NEM Id of the so...
ControlMessage interface is the base for all control messages.
void open(const INETAddr &address, bool bReuseAddress=false, const std::string &sDevice="", std::uint8_t u8TTL=1, bool bLoop=false)
void open(const INETAddr &otaGroupAddress, const std::string &sDevice, bool bLoopback, int iTTL, const uuid_t &uuid)
Definition: otamanager.cc:292
constexpr std::uint16_t HTONS(std::uint16_t x)
Definition: net.h:125
void unregisterOTAUser(NEMId id) override
Definition: otamanager.cc:282
const EventSerializations & getEventSerializations() const
void setStatEventCountRowLimit(size_t rows)
Definition: otamanager.cc:76
OTA user interface that allows access to the OTA provider.
Definition: otauser.h:50
Utils::VectorIO getVectorIO() const
constexpr std::uint16_t NTOHS(std::uint16_t x)
Definition: net.h:136
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static EventService * instance()
Definition: singleton.h:56
void setStatPacketCountRowLimit(size_t rows)
Definition: otamanager.cc:71
int elevate(std::thread &thread)
Definition: threadutils.h:44
void processEventMessage(NEMId nemId, EventId eventId, const Serialization &serialization, NEMId ignoreNEM={}) const