40 #include "otaheader.pb.h" 55 eventStatisticPublisher_{
"OTAChannel"},
93 EMANEMessage::Event::Data data;
95 if(!eventSerializations.empty())
101 for(
const auto & entry : eventSerializations)
103 std::tie(targetNEMId,
105 serialization) = entry;
115 auto pSerialization = data.add_serializations();
117 pSerialization->set_nemid(targetNEMId);
119 pSerialization->set_eventid(eventId);
121 pSerialization->set_data(serialization);
126 for(
const auto & pMessage : msgs)
130 const auto pTransmitterControlMessage =
144 if(nemUserMap_.size() > 1)
146 auto now = Clock::now();
149 pktInfo.getDestination(),
150 pktInfo.getPriority(),
156 for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
160 if(iter->first ==
id)
164 else if(otaTransmitters.count(iter->first) > 0)
178 std::string sEventSerialization{};
180 if(!eventSerializations.empty())
182 if(!data.SerializeToString(&sEventSerialization))
186 "OTAManager sendOTAPacket unable to serialize attached event data src:%hu dst:%hu",
188 pktInfo.getDestination());
196 EMANEMessage::OTAHeader otaheader;
198 otaheader.set_source(pktInfo.getSource());
199 otaheader.set_destination(pktInfo.getDestination());
200 otaheader.set_datalength(pkt.
length());
201 otaheader.set_controllength(controlMessageSerializer.getLength());
202 otaheader.set_eventlength(sEventSerialization.size());
203 otaheader.set_sequencenumber(++u64SequenceNumber_);
204 otaheader.set_uuid(reinterpret_cast<const char *>(uuid_),
sizeof(uuid_));
206 std::string sOTAHeader{};
208 if(otaheader.SerializeToString(&sOTAHeader))
210 std::uint16_t u16HeaderLength =
HTONS(sOTAHeader.size());
212 Utils::VectorIO vectorIO{{
reinterpret_cast<char *
>(&u16HeaderLength),
sizeof(u16HeaderLength)},
213 {
const_cast<char *
>(sOTAHeader.c_str()),sOTAHeader.size()}};
215 if(!sEventSerialization.empty())
217 vectorIO.push_back({
const_cast<char *
>(sEventSerialization.c_str()),sEventSerialization.size()});
220 const auto & controlMessageIO = controlMessageSerializer.getVectorIO();
222 vectorIO.insert(vectorIO.end(),controlMessageIO.begin(),controlMessageIO.end());
226 vectorIO.insert(vectorIO.end(),packetIO.begin(),packetIO.end());
229 if(mcast_.
send(&vectorIO[0],static_cast<int>(vectorIO.size())) == -1)
233 "OTAManager sendOTAPacket unable to send ctrl_len:%zu, payload_len:%zu src:%hu dst:%hu reason:%s\n",
234 controlMessageSerializer.getLength(),
237 pktInfo.getDestination(),
245 pktInfo.getSource());
248 for(
const auto & entry : eventSerializations)
260 "OTAManager sendOTAPacket unable to serialize OTA header src:%hu dst:%hu",
262 pktInfo.getDestination());
267 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
272 std::pair<NEMUserMap::iterator, bool> ret;
274 if(nemUserMap_.insert(std::make_pair(
id,pOTAUser)).second ==
false)
276 std::stringstream ssDescription;
277 ssDescription<<
"attempted to register duplicate user with id "<<
id<<std::ends;
284 if(nemUserMap_.erase(
id) == 0)
286 std::stringstream ssDescription;
287 ssDescription<<
"attempted to unregister unknown user with id "<<
id<<std::ends;
293 const std::string & otaManagerDevice,
298 otaGroupAddress_ = otaGroupAddress;
300 uuid_copy(uuid_,uuid);
304 mcast_.
open(otaGroupAddress,
true,otaManagerDevice,iTTL,bLoopback);
308 std::stringstream sstream;
310 sstream<<
"Platform OTA Manager: Unable to open OTA Manager socket: '" 311 <<otaGroupAddress.
str()
315 <<
"Possible reason(s):" 317 <<
" * No Multicast device specified and routing table nondeterministic" 319 <<
" (no multicast route and no default route)." 321 <<
" * Multicast device " 323 <<
" does not exist or is not up." 331 thread_ = std::thread{&EMANE::OTAManager::processOTAMessage,
this};
336 ERROR_LEVEL,
"OTAManager::open: Unable to set Real Time Priority");
343 void EMANE::OTAManager::processOTAMessage()
345 unsigned char buf[65536];
351 if((len = mcast_.
recv(buf,
sizeof(buf),0)) > 0)
353 auto now = Clock::now();
356 if(static_cast<size_t>(len) >=
sizeof(std::uint16_t))
358 std::uint16_t * pu16OTAHeaderLength{
reinterpret_cast<std::uint16_t *
>(buf)};
360 *pu16OTAHeaderLength =
NTOHS(*pu16OTAHeaderLength);
362 len -=
sizeof(std::uint16_t);
364 EMANEMessage::OTAHeader otaHeader;
366 if(static_cast<size_t>(len) >= *pu16OTAHeaderLength &&
367 otaHeader.ParseFromArray(&buf[2], *pu16OTAHeaderLength))
369 if(static_cast<size_t>(len) ==
370 otaHeader.datalength() +
371 otaHeader.controllength() +
372 otaHeader.eventlength() +
373 *pu16OTAHeaderLength)
375 std::uint16_t u16EventIndex = 2 + *pu16OTAHeaderLength;
376 std::uint16_t u16ControlIndex = u16EventIndex + otaHeader.eventlength();
377 std::uint16_t u16PacketIndex = u16ControlIndex + otaHeader.controllength();
380 uuid_copy(remoteUUID,reinterpret_cast<const unsigned char *>(otaHeader.uuid().data()));
382 if(uuid_compare(uuid_,remoteUUID))
384 if(otaHeader.eventlength())
386 EMANEMessage::Event::Data data;
388 if(data.ParseFromArray(&buf[u16EventIndex],otaHeader.eventlength()))
390 for(
const auto & serialization : data.serializations())
393 serialization.eventid(),
394 serialization.data());
398 serialization.eventid());
404 ERROR_LEVEL,
"OTAManager message events could not be deserialized");
410 otaHeader.destination(),
415 UpstreamPacket pkt(pktInfo,&buf[u16PacketIndex],otaHeader.datalength());
419 if(otaHeader.controllength())
423 otaHeader.controllength());
425 for(ControlMessages::const_iterator iter = msgs.begin(),end = msgs.end();
431 auto pSerializedControlMessage =
434 if(pSerializedControlMessage->getSerializedId() ==
437 std::unique_ptr<Controls::OTATransmitterControlMessage>
439 create(pSerializedControlMessage->getSerialization()));
441 otaTransmitters = pOTATransmitterControlMessage->getOTATransmitters();
453 pktInfo.getSource());
456 for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
460 if(otaTransmitters.count(iter->first) == 0)
472 "OTAManager Packet received data length incorrect" 473 " len: %zd header:%hu data:%u control: %u event: %u ",
475 *pu16OTAHeaderLength,
476 otaHeader.datalength(),
477 otaHeader.controllength(),
478 otaHeader.eventlength());
485 "OTAManager message header could not be deserialized");
492 "OTAManager message missing header missing prefix length encoding");
499 "OTAManager Packet Received error");
ssize_t recv(void *buf, size_t len, int flags=0)
std::set< NEMId > OTATransmitters
std::string Serialization
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
const PacketInfo & getPacketInfo() const
A Serialized Control Message is used to encapsulate Serializable control messages as they traverse pr...
void registerOTAUser(NEMId id, OTAUser *pOTAUser) override
void sendOTAPacket(NEMId id, const DownstreamPacket &pkt, const ControlMessages &msgs) const override
std::list< const ControlMessage * > ControlMessages
Exception thrown when registering or unregistering OTAUsers.
void setRowLimit(size_t rows)
std::vector< iovec > VectorIO
void update(Type type, const uuid_t &uuid, NEMId nemId)
Store source, destination, creation time and priority information for a packet.
static ControlMessages create(const void *pData, size_t length)
ssize_t send(const iovec *iov, int iovcnt, int flags=0) const
const char * what() const
void update(Type type, const uuid_t &uuid, EventId eventId)
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::string str(bool bWithPort=true) const
int cancel(std::thread &thread)
const OTATransmitters & getOTATransmitters() const
The OTA Transmitter Control Message is by the emulator physical layer to specify the NEM Id of the so...
ControlMessage interface is the base for all control messages.
void setRowLimit(size_t rows)
void open(const INETAddr &address, bool bReuseAddress=false, const std::string &sDevice="", std::uint8_t u8TTL=1, bool bLoop=false)
void open(const INETAddr &otaGroupAddress, const std::string &sDevice, bool bLoopback, int iTTL, const uuid_t &uuid)
constexpr std::uint16_t HTONS(std::uint16_t x)
void unregisterOTAUser(NEMId id) override
const EventSerializations & getEventSerializations() const
void setStatEventCountRowLimit(size_t rows)
OTA user interface that allows access to the OTA provider.
Utils::VectorIO getVectorIO() const
constexpr std::uint16_t NTOHS(std::uint16_t x)
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static EventService * instance()
void setStatPacketCountRowLimit(size_t rows)
int elevate(std::thread &thread)
void processEventMessage(NEMId nemId, EventId eventId, const Serialization &serialization, NEMId ignoreNEM={}) const