EMANE  1.2.1
otamanager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2017 - Adjacent Link LLC, Bridgewater, New Jersey
3  * Copyright (c) 2008-2012 - DRS CenGen, LLC, Columbia, Maryland
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above copyright
13  * notice, this list of conditions and the following disclaimer in
14  * the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of DRS CenGen, LLC nor the names of its
17  * contributors may be used to endorse or promote products derived
18  * from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "otamanager.h"
35 #include "otauser.h"
36 #include "logservice.h"
38 #include "otaexception.h"
39 #include "eventservice.h"
40 #include "otaheader.pb.h"
41 #include "event.pb.h"
42 #include "socketexception.h"
43 
44 #include "emane/net.h"
48 
49 #include <sstream>
50 #include <algorithm>
51 #include <uuid.h>
52 
53 namespace
54 {
55  struct PartInfo
56  {
57  std::uint8_t u8More_;
58  std::uint32_t u32Offset_;
59  std::uint32_t u32Size_;
60  } __attribute__((packed));
61 
62  std::vector<uint8_t> bufferFromVectorIO(size_t size,
63  size_t & index,
64  size_t & offset,
65  const EMANE::Utils::VectorIO & vectorIO)
66  {
67  std::vector<uint8_t> buf{};
68 
69  size_t targetBytes{size};
70 
71  while(targetBytes)
72  {
73  size_t available{vectorIO[index].iov_len - offset};
74 
75  if(available)
76  {
77  if(available >= targetBytes)
78  {
79  buf.insert(buf.end(),
80  &reinterpret_cast<uint8_t *>(vectorIO[index].iov_base)[offset],
81  &reinterpret_cast<uint8_t *>(vectorIO[index].iov_base)[offset+targetBytes]);
82 
83  offset += targetBytes;
84 
85  targetBytes = 0;
86  }
87  else
88  {
89  buf.insert(buf.end(),
90  &reinterpret_cast<uint8_t *>(vectorIO[index].iov_base)[offset],
91  &reinterpret_cast<uint8_t *>(vectorIO[index].iov_base)[offset] + available);
92 
93  targetBytes -= available;
94 
95  ++index;
96 
97  offset = 0;
98  }
99  }
100  }
101 
102  return buf;
103  }
104 }
105 
107  bOpen_(false),
108  otaMTU_{},
109  eventStatisticPublisher_{"OTAChannel"},
110  u64SequenceNumber_{},
111  lastPartCheckTime_{}
112 {
113  uuid_clear(uuid_);
114 }
115 
117 {
118  if(bOpen_)
119  {
120  ThreadUtils::cancel(thread_);
121 
122  thread_.join();
123  }
124 }
125 
127 {
128  otaStatisticPublisher_.setRowLimit(rows);
129 }
130 
132 {
133  eventStatisticPublisher_.setRowLimit(rows);
134 }
135 
137  const DownstreamPacket & pkt,
138  const ControlMessages & msgs) const
139 {
140  // get the pkt info
141  const PacketInfo & pktInfo{pkt.getPacketInfo()};
142 
143  // set of optional additional transmitters (AT)
144  Controls::OTATransmitters otaTransmitters{};
145 
146  auto eventSerializations = pkt.getEventSerializations();
147 
148  EMANEMessage::Event::Data data;
149 
150  if(!eventSerializations.empty())
151  {
152  NEMId targetNEMId;
153  EventId eventId;
154  Serialization serialization;
155 
156  for(const auto & entry : eventSerializations)
157  {
158  std::tie(targetNEMId,
159  eventId,
160  serialization) = entry;
161 
162  // process any local event
164  eventId,
165  serialization,
166  id);
167 
168  if(bOpen_)
169  {
170  auto pSerialization = data.add_serializations();
171 
172  pSerialization->set_nemid(targetNEMId);
173 
174  pSerialization->set_eventid(eventId);
175 
176  pSerialization->set_data(serialization);
177  }
178  }
179  }
180 
181  for(const auto & pMessage : msgs)
182  {
184  {
185  const auto pTransmitterControlMessage =
186  reinterpret_cast<const Controls::OTATransmitterControlMessage *>(pMessage);
187 
188  otaTransmitters = pTransmitterControlMessage->getOTATransmitters();
189  }
190  }
191 
192  /*
193  * UpstreamPacket data is shared (reference counted). The same
194  * packet can be used in multiple calls to OTAUser::processOTAPacket
195  * since the resulting action is to enqueue a referenced counted
196  * copy on each NEM queue. Each copy will share the same packet data
197  * but have unique index counters used for stripping packet data.
198  */
199  if(nemUserMap_.size() > 1)
200  {
201  auto now = Clock::now();
202 
203  UpstreamPacket upstreamPacket({pktInfo.getSource(),
204  pktInfo.getDestination(),
205  pktInfo.getPriority(),
206  now,
207  uuid_},
208  pkt.getVectorIO());
209 
210  // bounce a copy of the pkt back up to our local NEM stack(s)
211  for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
212  iter != end;
213  ++iter)
214  {
215  if(iter->first == id)
216  {
217  // skip our own transmisstion
218  }
219  else if(otaTransmitters.count(iter->first) > 0)
220  {
221  // skip NEM(s) in the additional transmitter set (ATS)
222  }
223  else
224  {
225  iter->second->processOTAPacket(upstreamPacket,ControlMessages());
226  }
227  }
228  }
229 
230  // send the packet to additional OTAManagers using OTA multicast transport
231  if(bOpen_)
232  {
233  std::string sEventSerialization{};
234 
235  if(!eventSerializations.empty())
236  {
237  if(!data.SerializeToString(&sEventSerialization))
238  {
240  ERROR_LEVEL,
241  "OTAManager sendOTAPacket unable to serialize attached event data src:%hu dst:%hu",
242  pktInfo.getSource(),
243  pktInfo.getDestination());
244  }
245  }
246 
247  ControlMessageSerializer controlMessageSerializer{msgs};
248 
249  // create an ota message to carry the packet_info, and variable ctrl data len only
250  // total message length with data payload is defined below
251  EMANEMessage::OTAHeader otaheader;
252 
253  size_t totalSizeBytes = pkt.length() +
254  controlMessageSerializer.getLength() +
255  sEventSerialization.size();
256 
257  otaheader.set_source(pktInfo.getSource());
258  otaheader.set_destination(pktInfo.getDestination());
259  otaheader.set_sequence(++u64SequenceNumber_);
260  otaheader.set_uuid(reinterpret_cast<const char *>(uuid_),sizeof(uuid_));
261 
262  auto pPayloadInfo = otaheader.mutable_payloadinfo();
263 
264  pPayloadInfo->set_datalength(pkt.length());
265  pPayloadInfo->set_controllength(controlMessageSerializer.getLength());
266  pPayloadInfo->set_eventlength(sEventSerialization.size());
267 
268  // vector hold everything to be transmitted except the OTAHeader
269  Utils::VectorIO stagingVectorIO{};
270  size_t stagingIndex{};
271  size_t stagingOffset{};
272 
273  if(!sEventSerialization.empty())
274  {
275  stagingVectorIO.push_back({const_cast<char *>(sEventSerialization.c_str()),sEventSerialization.size()});
276  }
277 
278  const auto & controlMessageIO = controlMessageSerializer.getVectorIO();
279 
280  stagingVectorIO.insert(stagingVectorIO.end(),controlMessageIO.begin(),controlMessageIO.end());
281 
282  const auto & packetIO = pkt.getVectorIO();
283 
284  stagingVectorIO.insert(stagingVectorIO.end(),packetIO.begin(),packetIO.end());
285 
286  ++u64SequenceNumber_;
287 
288  size_t sentBytes{};
289  PartInfo partInfo{false,0,0};
290 
291  while(sentBytes != totalSizeBytes)
292  {
293  EMANEMessage::OTAHeader otaheader;
294  otaheader.set_source(pktInfo.getSource());
295  otaheader.set_destination(pktInfo.getDestination());
296  otaheader.set_sequence(u64SequenceNumber_);
297  otaheader.set_uuid(reinterpret_cast<const char *>(uuid_),sizeof(uuid_));
298 
299  if(sentBytes==0)
300  {
301  auto pPayloadInfo = otaheader.mutable_payloadinfo();
302  pPayloadInfo->set_datalength(pkt.length());
303  pPayloadInfo->set_controllength(controlMessageSerializer.getLength());
304  pPayloadInfo->set_eventlength(sEventSerialization.size());
305  }
306 
307  std::string sOTAHeader{};
308 
309  if(!otaheader.SerializeToString(&sOTAHeader))
310  {
312  ERROR_LEVEL,
313  "OTAManager sendOTAPacket unable to serialize OTA header src:%hu dst:%hu",
314  pktInfo.getSource(),
315  pktInfo.getDestination());
316  break;
317  }
318 
319  // total wire size includes 16 bit length prefix framing of header
320  size_t totalWireSize = totalSizeBytes - sentBytes + (sOTAHeader.size() + 2) + sizeof(PartInfo);
321 
322  std::uint16_t u16HeaderLength = HTONS(sOTAHeader.size());
323 
324  Utils::VectorIO vectorIO{{reinterpret_cast<char *>(&u16HeaderLength),sizeof(u16HeaderLength)},
325  {const_cast<char *>(sOTAHeader.c_str()),sOTAHeader.size()},
326  {reinterpret_cast<char *>(&partInfo),sizeof(partInfo)}};
327 
328  size_t payloadSize{};
329 
330  if(otaMTU_ != 0 and totalWireSize > otaMTU_)
331  {
332  partInfo.u8More_ = 1;
333  // size of payload only (event + control + packet data)
334  // adjusted for MTU and overhead (OTAHeader +
335  // PartInfo)
336  payloadSize = otaMTU_ - (sOTAHeader.size() + 2 + sizeof(partInfo));
337  partInfo.u32Size_ = HTONL(payloadSize);
338  }
339  else
340  {
341  partInfo.u8More_ = 0;
342  // size of payload only (event + control + packet data)
343  payloadSize = totalSizeBytes - sentBytes;
344  partInfo.u32Size_ = HTONL(payloadSize);
345  }
346 
347  partInfo.u32Offset_ = HTONL(totalSizeBytes - (totalSizeBytes - sentBytes));
348 
349  sentBytes += payloadSize;
350 
351  while(payloadSize)
352  {
353  size_t avaiableInEntrySize = stagingVectorIO[stagingIndex].iov_len - stagingOffset;
354 
355  if(avaiableInEntrySize > payloadSize)
356  {
357  vectorIO.push_back({reinterpret_cast<char *>(stagingVectorIO[stagingIndex].iov_base) + stagingOffset,
358  payloadSize});
359 
360  stagingOffset += payloadSize;
361  payloadSize = 0;
362  }
363  else
364  {
365  vectorIO.push_back({reinterpret_cast<char *>(stagingVectorIO[stagingIndex].iov_base) + stagingOffset,
366  avaiableInEntrySize});
367 
368  payloadSize -= avaiableInEntrySize;
369  stagingOffset = 0;
370  ++stagingIndex;
371  }
372  }
373 
374  // gather and send
375  if(mcast_.send(&vectorIO[0],static_cast<int>(vectorIO.size())) == -1)
376  {
378  ERROR_LEVEL,
379  "OTAManager sendOTAPacket unable to send ctrl_len:%zu,"
380  " payload_len:%zu src:%hu dst:%hu reason:%s\n",
381  controlMessageSerializer.getLength(),
382  pkt.length(),
383  pktInfo.getSource(),
384  pktInfo.getDestination(),
385  strerror(errno));
386 
387  }
388  else
389  {
391  uuid_,
392  pktInfo.getSource());
393 
394 
395  for(const auto & entry : eventSerializations)
396  {
397  eventStatisticPublisher_.update(EventStatisticPublisher::Type::TYPE_TX,
398  uuid_,
399  std::get<1>(entry));
400  }
401  }
402  }
403  }
404 
405  // clean up control messages
406  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
407 }
408 
410 {
411  std::pair<NEMUserMap::iterator, bool> ret;
412 
413  if(nemUserMap_.insert(std::make_pair(id,pOTAUser)).second == false)
414  {
415  std::stringstream ssDescription;
416  ssDescription<<"attempted to register duplicate user with id "<<id<<std::ends;
417  throw OTAException(ssDescription.str());
418  }
419 }
420 
422 {
423  if(nemUserMap_.erase(id) == 0)
424  {
425  std::stringstream ssDescription;
426  ssDescription<<"attempted to unregister unknown user with id "<<id<<std::ends;
427  throw OTAException(ssDescription.str());
428  }
429 }
430 
431 void EMANE::OTAManager::open(const INETAddr & otaGroupAddress,
432  const std::string & otaManagerDevice,
433  bool bLoopback,
434  int iTTL,
435  const uuid_t & uuid,
436  size_t otaMTU,
437  Seconds partCheckThreshold,
438  Seconds partTimeoutThreshold)
439 {
440  otaGroupAddress_ = otaGroupAddress;
441  otaMTU_ = otaMTU;
442  partCheckThreshold_ = partCheckThreshold;
443  partTimeoutThreshold_ = partTimeoutThreshold;
444  uuid_copy(uuid_,uuid);
445 
446  try
447  {
448  mcast_.open(otaGroupAddress,true,otaManagerDevice,iTTL,bLoopback);
449  }
450  catch(SocketException & exp)
451  {
452  std::stringstream sstream;
453 
454  sstream<<"Platform OTA Manager: Unable to open OTA Manager socket: '"
455  <<otaGroupAddress.str()
456  <<"'."
457  <<std::endl
458  <<std::endl
459  <<"Possible reason(s):"
460  <<std::endl
461  <<" * No Multicast device specified and routing table nondeterministic"
462  <<std::endl
463  <<" (no multicast route and no default route)."
464  <<std::endl
465  <<" * Multicast device "
466  <<otaManagerDevice
467  <<" does not exist or is not up."
468  <<std::endl
469  <<exp.what()
470  <<std::ends;
471 
472  throw OTAException(sstream.str());
473  }
474 
475  thread_ = std::thread{&EMANE::OTAManager::processOTAMessage,this};
476 
477  if(ThreadUtils::elevate(thread_))
478  {
480  ERROR_LEVEL,"OTAManager::open: Unable to set Real Time Priority");
481  }
482 
483  bOpen_ = true;
484 }
485 
486 
487 void EMANE::OTAManager::processOTAMessage()
488 {
489  unsigned char buf[65536];
490 
491  ssize_t len = 0;
492 
493  while(1)
494  {
495  if((len = mcast_.recv(buf,sizeof(buf),0)) > 0)
496  {
497  auto now = Clock::now();
498 
499  // ota message len sanity check
500  if(static_cast<size_t>(len) >= sizeof(std::uint16_t))
501  {
502  std::uint16_t * pu16OTAHeaderLength{reinterpret_cast<std::uint16_t *>(buf)};
503 
504  *pu16OTAHeaderLength = NTOHS(*pu16OTAHeaderLength);
505 
506  len -= sizeof(std::uint16_t);
507 
508  EMANEMessage::OTAHeader otaHeader;
509 
510  size_t payloadIndex{2 + *pu16OTAHeaderLength + sizeof(PartInfo)};
511 
512  if(static_cast<size_t>(len) >= *pu16OTAHeaderLength + sizeof(PartInfo) &&
513  otaHeader.ParseFromArray(&buf[2], *pu16OTAHeaderLength))
514  {
515  PartInfo * pPartInfo{reinterpret_cast<PartInfo *>(&buf[2+*pu16OTAHeaderLength])};
516  pPartInfo->u32Offset_ = NTOHL(pPartInfo->u32Offset_);
517  pPartInfo->u32Size_ = NTOHL(pPartInfo->u32Size_);
518 
519  uuid_t remoteUUID;
520  uuid_copy(remoteUUID,reinterpret_cast<const unsigned char *>(otaHeader.uuid().data()));
521 
522  // only process messages that were not sent by this instance
523  if(uuid_compare(uuid_,remoteUUID))
524  {
525  // verify we have the advertized part length
526  if(static_cast<size_t>(len) ==
527  *pu16OTAHeaderLength +
528  sizeof(PartInfo) +
529  pPartInfo->u32Size_)
530  {
531  // message contained in a single part
532  if(!pPartInfo->u8More_ && !pPartInfo->u32Offset_)
533  {
534  auto & payloadInfo = otaHeader.payloadinfo();
535  handleOTAMessage(otaHeader.source(),
536  otaHeader.destination(),
537  remoteUUID,
538  now,
539  payloadInfo.eventlength(),
540  payloadInfo.controllength(),
541  payloadInfo.datalength(),
542  {{&buf[payloadIndex],pPartInfo->u32Size_}});
543  }
544  else
545  {
546  PartKey partKey = PartKey{otaHeader.source(),otaHeader.sequence()};
547 
548  auto iter = partStore_.find(partKey);
549 
550  if(iter != partStore_.end())
551  {
552  size_t & totalReceivedPartsBytes{std::get<0>(iter->second)};
553  size_t & totalEventBytes{std::get<1>(iter->second)};
554  size_t & totalControlBytes{std::get<2>(iter->second)};
555  size_t & totalDataBytes{std::get<3>(iter->second)};
556  auto & parts = std::get<4>(iter->second);
557  auto & lastPartTime = std::get<5>(iter->second);
558 
559  // check to see if first part has been received
560  if(otaHeader.has_payloadinfo())
561  {
562  auto & payloadInfo = otaHeader.payloadinfo();
563  totalEventBytes = payloadInfo.eventlength();
564  totalControlBytes = payloadInfo.controllength();
565  totalDataBytes = payloadInfo.datalength();
566  }
567 
568  // update last part receive time
569  lastPartTime = now;
570 
571  // add this part to parts and update receive count
572  totalReceivedPartsBytes += pPartInfo->u32Size_;
573 
574  parts.insert(std::make_pair(static_cast<size_t>(pPartInfo->u32Offset_),
575  std::vector<uint8_t>(&buf[payloadIndex],
576  &buf[payloadIndex + pPartInfo->u32Size_])));
577 
578  // determine if all parts are accounted for
579  size_t totalExpectedPartsBytes = totalDataBytes + totalEventBytes + totalControlBytes;
580 
581  if(totalReceivedPartsBytes == totalExpectedPartsBytes)
582  {
583  Utils::VectorIO vectorIO{};
584 
585  // get the parts sorted by offset and build an iovec
586  for(const auto & part : parts)
587  {
588  vectorIO.push_back({const_cast<uint8_t *>(&part.second[0]),
589  part.second.size()});
590  }
591 
592  handleOTAMessage(otaHeader.source(),
593  otaHeader.destination(),
594  remoteUUID,
595  now,
596  totalEventBytes,
597  totalControlBytes,
598  totalDataBytes,
599  vectorIO);
600 
601  // remove part cache and part time store
602  partStore_.erase(iter);
603  }
604  }
605  else
606  {
607  PartKey partKey = PartKey{otaHeader.source(),otaHeader.sequence()};
608 
609  Parts parts{};
610 
611  parts.insert(std::make_pair(static_cast<size_t>(pPartInfo->u32Offset_),
612  std::vector<uint8_t>(&buf[payloadIndex],
613  &buf[payloadIndex + pPartInfo->u32Size_])));
614 
615  std::array<uint8_t,sizeof(uuid_t)> uuid;
616  uuid_copy(uuid.data(),remoteUUID);
617 
618  // first part of message
619  // check to see if first part has been received
620  if(otaHeader.has_payloadinfo())
621  {
622  auto & payloadInfo = otaHeader.payloadinfo();
623 
624  partStore_.insert({partKey,
625  std::make_tuple(static_cast<size_t>(pPartInfo->u32Size_),
626  payloadInfo.eventlength(),
627  payloadInfo.controllength(),
628  payloadInfo.datalength(),
629  parts,
630  now,
631  uuid)});
632  }
633  else
634  {
635  partStore_.insert({partKey,
636  std::make_tuple(static_cast<size_t>(pPartInfo->u32Size_),
637  0, // event length
638  0, // control length
639  0, // data length
640  parts,
641  now,
642  uuid)});
643  }
644  }
645  }
646  }
647  else
648  {
650  ERROR_LEVEL,
651  "OTAManager message part size mismatch");
652  }
653  }
654  }
655  else
656  {
658  ERROR_LEVEL,
659  "OTAManager message header could not be deserialized");
660  }
661  }
662  else
663  {
665  ERROR_LEVEL,
666  "OTAManager message missing header missing prefix length encoding");
667  }
668 
669  // check to see if there are part assemblies to abandon
670  if(lastPartCheckTime_ + partCheckThreshold_ <= now)
671  {
672  for(auto iter = partStore_.begin(); iter != partStore_.end();)
673  {
674  auto & lastPartTime = std::get<5>(iter->second);
675 
676  if(lastPartTime + partTimeoutThreshold_ <= now)
677  {
678  auto & srcNEM = std::get<0>(iter->first);
679  uuid_t uuid;
680  uuid_copy(uuid,std::get<6>(iter->second).data());
681 
683  uuid,
684  srcNEM);
685 
687  ERROR_LEVEL,
688  "OTAManager missing one or more packet parts src:"
689  " %hu sequence: %ju, dropping.",
690  srcNEM,
691  std::get<1>(iter->first));
692 
693  partStore_.erase(iter++);
694  }
695  else
696  {
697  ++iter;
698  }
699  }
700 
701  lastPartCheckTime_ = now;
702  }
703  }
704  else
705  {
707  ERROR_LEVEL,
708  "OTAManager Packet Received error");
709  break;
710  }
711 
712  }
713 }
714 
715 
716 void EMANE::OTAManager::handleOTAMessage(NEMId source,
717  NEMId destination,
718  const uuid_t & remoteUUID,
719  const TimePoint & now,
720  size_t eventsSize,
721  size_t controlsSize,
722  size_t dataSize,
723  const Utils::VectorIO & vectorIO)
724 {
725  size_t index{};
726  size_t offset{};
727 
728  if(eventsSize)
729  {
730  std::vector<uint8_t> buf{bufferFromVectorIO(eventsSize,
731  index,
732  offset,
733  vectorIO)};
734  EMANEMessage::Event::Data data;
735 
736  if(data.ParseFromArray(&buf[0],eventsSize))
737  {
738  for(const auto & serialization : data.serializations())
739  {
740  EventServiceSingleton::instance()->processEventMessage(serialization.nemid(),
741  serialization.eventid(),
742  serialization.data());
743 
744  eventStatisticPublisher_.update(EventStatisticPublisher::Type::TYPE_RX,
745  remoteUUID,
746  serialization.eventid());
747  }
748  }
749  else
750  {
752  ERROR_LEVEL,
753  "OTAManager message events could not be deserialized");
754  }
755  }
756 
757  Controls::OTATransmitters otaTransmitters{};
758 
759  if(controlsSize)
760  {
761  std::vector<uint8_t> buf{bufferFromVectorIO(controlsSize,
762  index,
763  offset,
764  vectorIO)};
765 
766  ControlMessages msgs =
768  controlsSize);
769 
770  for(ControlMessages::const_iterator iter = msgs.begin(),end = msgs.end();
771  iter != end;
772  ++iter)
773  {
774  if((*iter)->getId() == Controls::SerializedControlMessage::IDENTIFIER)
775  {
776  auto pSerializedControlMessage =
777  static_cast<const Controls::SerializedControlMessage *>(*iter);
778 
779  if(pSerializedControlMessage->getSerializedId() ==
781  {
782  std::unique_ptr<Controls::OTATransmitterControlMessage>
783  pOTATransmitterControlMessage(Controls::OTATransmitterControlMessage::
784  create(pSerializedControlMessage->getSerialization()));
785 
786  otaTransmitters = pOTATransmitterControlMessage->getOTATransmitters();
787  }
788 
789  }
790 
791  // delete all control messages
792  delete *iter;
793  }
794  }
795 
796  // create packet info from the ota data message
797  PacketInfo pktInfo(source,
798  destination,
799  0,
800  now,
801  remoteUUID);
802 
803  Utils::VectorIO packetVectorIO{};
804 
805  for(; index < vectorIO.size(); ++index, offset=0)
806  {
807  packetVectorIO.push_back({reinterpret_cast<uint8_t *>(vectorIO[index].iov_base) + offset,
808  vectorIO[index].iov_len - offset});
809  }
810 
811  UpstreamPacket pkt(pktInfo,packetVectorIO);
812 
813  if(pkt.length() == dataSize)
814  {
816  remoteUUID,
817  pktInfo.getSource());
818 
819  // for each local NEM stack
820  for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
821  iter != end; ++iter)
822  {
823  // only send pkt up to NEM(s) NOT in the ATS
824  if(otaTransmitters.count(iter->first) == 0)
825  {
826  iter->second->processOTAPacket(pkt,ControlMessages());
827  }
828  }
829  }
830  else
831  {
833  ERROR_LEVEL,
834  "OTAManager packet size does not match reported size in OTA header");
835  }
836 }
ssize_t recv(void *buf, size_t len, int flags=0)
std::set< NEMId > OTATransmitters
std::string Serialization
Definition: serializable.h:42
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
const PacketInfo & getPacketInfo() const
void registerOTAUser(NEMId id, OTAUser *pOTAUser) override
Definition: otamanager.cc:409
void sendOTAPacket(NEMId id, const DownstreamPacket &pkt, const ControlMessages &msgs) const override
Definition: otamanager.cc:136
std::list< const ControlMessage * > ControlMessages
Exception thrown when registering or unregistering OTAUsers.
Definition: otaexception.h:46
constexpr std::uint32_t NTOHL(std::uint32_t x)
Definition: net.h:114
std::vector< iovec > VectorIO
Definition: vectorio.h:43
std::chrono::seconds Seconds
Definition: types.h:43
std::uint16_t EventId
Definition: types.h:53
void update(Type type, const uuid_t &uuid, NEMId nemId)
struct sockaddr_in_t __attribute__((__may_alias__))
Store source, destination, creation time and priority information for a packet.
Definition: packetinfo.h:50
static ControlMessages create(const void *pData, size_t length)
std::uint32_t u32Offset_
Definition: otamanager.cc:718
ssize_t send(const iovec *iov, int iovcnt, int flags=0) const
std::uint32_t u32Size_
Definition: otamanager.cc:719
const char * what() const
Definition: exception.h:62
void update(Type type, const uuid_t &uuid, EventId eventId)
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::string str(bool bWithPort=true) const
Definition: inetaddr.cc:409
int cancel(std::thread &thread)
Definition: threadutils.h:65
std::uint16_t NEMId
Definition: types.h:52
The OTA Transmitter Control Message is by the emulator physical layer to specify the NEM Id of the so...
std::uint8_t u8More_
Definition: otamanager.cc:717
ControlMessage interface is the base for all control messages.
void open(const INETAddr &address, bool bReuseAddress=false, const std::string &sDevice="", std::uint8_t u8TTL=1, bool bLoop=false)
constexpr std::uint16_t HTONS(std::uint16_t x)
Definition: net.h:125
void unregisterOTAUser(NEMId id) override
Definition: otamanager.cc:421
const EventSerializations & getEventSerializations() const
void setStatEventCountRowLimit(size_t rows)
Definition: otamanager.cc:131
OTA user interface that allows access to the OTA provider.
Definition: otauser.h:50
Utils::VectorIO getVectorIO() const
Clock::time_point TimePoint
Definition: types.h:50
constexpr std::uint16_t NTOHS(std::uint16_t x)
Definition: net.h:136
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
constexpr std::uint32_t HTONL(std::uint32_t x)
Definition: net.h:103
static EventService * instance()
Definition: singleton.h:56
void setStatPacketCountRowLimit(size_t rows)
Definition: otamanager.cc:126
void open(const INETAddr &otaGroupAddress, const std::string &sDevice, bool bLoopback, int iTTL, const uuid_t &uuid, size_t otaMTU, Seconds partCheckThreshold, Seconds partTimeoutThreshold)
Definition: otamanager.cc:431
int elevate(std::thread &thread)
Definition: threadutils.h:44
void processEventMessage(NEMId nemId, EventId eventId, const Serialization &serialization, NEMId ignoreNEM={}) const