40 #include "otaheader.pb.h" 62 std::vector<uint8_t> bufferFromVectorIO(
size_t size,
67 std::vector<uint8_t> buf{};
69 size_t targetBytes{size};
73 size_t available{vectorIO[index].iov_len - offset};
77 if(available >= targetBytes)
80 &
reinterpret_cast<uint8_t *
>(vectorIO[index].iov_base)[offset],
81 &reinterpret_cast<uint8_t *>(vectorIO[index].iov_base)[offset+targetBytes]);
83 offset += targetBytes;
90 &
reinterpret_cast<uint8_t *
>(vectorIO[index].iov_base)[offset],
91 &reinterpret_cast<uint8_t *>(vectorIO[index].iov_base)[offset] + available);
93 targetBytes -= available;
109 eventStatisticPublisher_{
"OTAChannel"},
110 u64SequenceNumber_{},
148 EMANEMessage::Event::Data data;
150 if(!eventSerializations.empty())
156 for(
const auto & entry : eventSerializations)
158 std::tie(targetNEMId,
160 serialization) = entry;
170 auto pSerialization = data.add_serializations();
172 pSerialization->set_nemid(targetNEMId);
174 pSerialization->set_eventid(eventId);
176 pSerialization->set_data(serialization);
181 for(
const auto & pMessage : msgs)
185 const auto pTransmitterControlMessage =
199 if(nemUserMap_.size() > 1)
201 auto now = Clock::now();
204 pktInfo.getDestination(),
205 pktInfo.getPriority(),
211 for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
215 if(iter->first ==
id)
219 else if(otaTransmitters.count(iter->first) > 0)
233 std::string sEventSerialization{};
235 if(!eventSerializations.empty())
237 if(!data.SerializeToString(&sEventSerialization))
241 "OTAManager sendOTAPacket unable to serialize attached event data src:%hu dst:%hu",
243 pktInfo.getDestination());
253 size_t totalSizeBytes = pkt.
length() +
254 controlMessageSerializer.getLength() +
255 sEventSerialization.size();
257 otaheader.set_source(pktInfo.getSource());
258 otaheader.set_destination(pktInfo.getDestination());
259 otaheader.set_sequence(++u64SequenceNumber_);
260 otaheader.set_uuid(reinterpret_cast<const char *>(uuid_),
sizeof(uuid_));
262 auto pPayloadInfo = otaheader.mutable_payloadinfo();
264 pPayloadInfo->set_datalength(pkt.
length());
265 pPayloadInfo->set_controllength(controlMessageSerializer.getLength());
266 pPayloadInfo->set_eventlength(sEventSerialization.size());
270 size_t stagingIndex{};
271 size_t stagingOffset{};
273 if(!sEventSerialization.empty())
275 stagingVectorIO.push_back({
const_cast<char *
>(sEventSerialization.c_str()),sEventSerialization.size()});
278 const auto & controlMessageIO = controlMessageSerializer.getVectorIO();
280 stagingVectorIO.insert(stagingVectorIO.end(),controlMessageIO.begin(),controlMessageIO.end());
284 stagingVectorIO.insert(stagingVectorIO.end(),packetIO.begin(),packetIO.end());
286 ++u64SequenceNumber_;
289 PartInfo partInfo{
false,0,0};
291 while(sentBytes != totalSizeBytes)
294 otaheader.set_source(pktInfo.getSource());
295 otaheader.set_destination(pktInfo.getDestination());
296 otaheader.set_sequence(u64SequenceNumber_);
297 otaheader.set_uuid(reinterpret_cast<const char *>(uuid_),
sizeof(uuid_));
301 auto pPayloadInfo = otaheader.mutable_payloadinfo();
302 pPayloadInfo->set_datalength(pkt.
length());
303 pPayloadInfo->set_controllength(controlMessageSerializer.getLength());
304 pPayloadInfo->set_eventlength(sEventSerialization.size());
307 std::string sOTAHeader{};
309 if(!otaheader.SerializeToString(&sOTAHeader))
313 "OTAManager sendOTAPacket unable to serialize OTA header src:%hu dst:%hu",
315 pktInfo.getDestination());
320 size_t totalWireSize = totalSizeBytes - sentBytes + (sOTAHeader.size() + 2) +
sizeof(PartInfo);
322 std::uint16_t u16HeaderLength =
HTONS(sOTAHeader.size());
324 Utils::VectorIO vectorIO{{
reinterpret_cast<char *
>(&u16HeaderLength),
sizeof(u16HeaderLength)},
325 {
const_cast<char *
>(sOTAHeader.c_str()),sOTAHeader.size()},
326 {
reinterpret_cast<char *
>(&partInfo),
sizeof(partInfo)}};
328 size_t payloadSize{};
330 if(otaMTU_ != 0 and totalWireSize > otaMTU_)
332 partInfo.u8More_ = 1;
336 payloadSize = otaMTU_ - (sOTAHeader.size() + 2 +
sizeof(partInfo));
337 partInfo.u32Size_ =
HTONL(payloadSize);
341 partInfo.u8More_ = 0;
343 payloadSize = totalSizeBytes - sentBytes;
344 partInfo.u32Size_ =
HTONL(payloadSize);
347 partInfo.u32Offset_ =
HTONL(totalSizeBytes - (totalSizeBytes - sentBytes));
349 sentBytes += payloadSize;
353 size_t avaiableInEntrySize = stagingVectorIO[stagingIndex].iov_len - stagingOffset;
355 if(avaiableInEntrySize > payloadSize)
357 vectorIO.push_back({
reinterpret_cast<char *
>(stagingVectorIO[stagingIndex].iov_base) + stagingOffset,
360 stagingOffset += payloadSize;
365 vectorIO.push_back({
reinterpret_cast<char *
>(stagingVectorIO[stagingIndex].iov_base) + stagingOffset,
366 avaiableInEntrySize});
368 payloadSize -= avaiableInEntrySize;
375 if(mcast_.
send(&vectorIO[0],static_cast<int>(vectorIO.size())) == -1)
379 "OTAManager sendOTAPacket unable to send ctrl_len:%zu," 380 " payload_len:%zu src:%hu dst:%hu reason:%s\n",
381 controlMessageSerializer.getLength(),
384 pktInfo.getDestination(),
392 pktInfo.getSource());
395 for(
const auto & entry : eventSerializations)
406 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
411 std::pair<NEMUserMap::iterator, bool> ret;
413 if(nemUserMap_.insert(std::make_pair(
id,pOTAUser)).second ==
false)
415 std::stringstream ssDescription;
416 ssDescription<<
"attempted to register duplicate user with id "<<
id<<std::ends;
423 if(nemUserMap_.erase(
id) == 0)
425 std::stringstream ssDescription;
426 ssDescription<<
"attempted to unregister unknown user with id "<<
id<<std::ends;
432 const std::string & otaManagerDevice,
440 otaGroupAddress_ = otaGroupAddress;
442 partCheckThreshold_ = partCheckThreshold;
443 partTimeoutThreshold_ = partTimeoutThreshold;
444 uuid_copy(uuid_,uuid);
448 mcast_.
open(otaGroupAddress,
true,otaManagerDevice,iTTL,bLoopback);
452 std::stringstream sstream;
454 sstream<<
"Platform OTA Manager: Unable to open OTA Manager socket: '" 455 <<otaGroupAddress.
str()
459 <<
"Possible reason(s):" 461 <<
" * No Multicast device specified and routing table nondeterministic" 463 <<
" (no multicast route and no default route)." 465 <<
" * Multicast device " 467 <<
" does not exist or is not up." 475 thread_ = std::thread{&EMANE::OTAManager::processOTAMessage,
this};
480 ERROR_LEVEL,
"OTAManager::open: Unable to set Real Time Priority");
487 void EMANE::OTAManager::processOTAMessage()
489 unsigned char buf[65536];
495 if((len = mcast_.
recv(buf,
sizeof(buf),0)) > 0)
497 auto now = Clock::now();
500 if(static_cast<size_t>(len) >=
sizeof(std::uint16_t))
502 std::uint16_t * pu16OTAHeaderLength{
reinterpret_cast<std::uint16_t *
>(buf)};
504 *pu16OTAHeaderLength =
NTOHS(*pu16OTAHeaderLength);
506 len -=
sizeof(std::uint16_t);
510 size_t payloadIndex{2 + *pu16OTAHeaderLength +
sizeof(PartInfo)};
512 if(static_cast<size_t>(len) >= *pu16OTAHeaderLength +
sizeof(PartInfo) &&
513 otaHeader.ParseFromArray(&buf[2], *pu16OTAHeaderLength))
515 PartInfo * pPartInfo{
reinterpret_cast<PartInfo *
>(&buf[2+*pu16OTAHeaderLength])};
516 pPartInfo->u32Offset_ =
NTOHL(pPartInfo->u32Offset_);
517 pPartInfo->u32Size_ =
NTOHL(pPartInfo->u32Size_);
520 uuid_copy(remoteUUID,reinterpret_cast<const unsigned char *>(otaHeader.uuid().data()));
523 if(uuid_compare(uuid_,remoteUUID))
526 if(static_cast<size_t>(len) ==
527 *pu16OTAHeaderLength +
532 if(!pPartInfo->u8More_ && !pPartInfo->u32Offset_)
534 auto & payloadInfo = otaHeader.payloadinfo();
535 handleOTAMessage(otaHeader.source(),
536 otaHeader.destination(),
539 payloadInfo.eventlength(),
540 payloadInfo.controllength(),
541 payloadInfo.datalength(),
542 {{&buf[payloadIndex],pPartInfo->u32Size_}});
546 PartKey partKey = PartKey{otaHeader.source(),otaHeader.sequence()};
548 auto iter = partStore_.find(partKey);
550 if(iter != partStore_.end())
552 size_t & totalReceivedPartsBytes{std::get<0>(iter->second)};
553 size_t & totalEventBytes{std::get<1>(iter->second)};
554 size_t & totalControlBytes{std::get<2>(iter->second)};
555 size_t & totalDataBytes{std::get<3>(iter->second)};
556 auto & parts = std::get<4>(iter->second);
557 auto & lastPartTime = std::get<5>(iter->second);
560 if(otaHeader.has_payloadinfo())
562 auto & payloadInfo = otaHeader.payloadinfo();
563 totalEventBytes = payloadInfo.eventlength();
564 totalControlBytes = payloadInfo.controllength();
565 totalDataBytes = payloadInfo.datalength();
572 totalReceivedPartsBytes += pPartInfo->u32Size_;
574 parts.insert(std::make_pair(static_cast<size_t>(pPartInfo->u32Offset_),
575 std::vector<uint8_t>(&buf[payloadIndex],
576 &buf[payloadIndex + pPartInfo->u32Size_])));
579 size_t totalExpectedPartsBytes = totalDataBytes + totalEventBytes + totalControlBytes;
581 if(totalReceivedPartsBytes == totalExpectedPartsBytes)
586 for(
const auto & part : parts)
588 vectorIO.push_back({
const_cast<uint8_t *
>(&part.second[0]),
589 part.second.size()});
592 handleOTAMessage(otaHeader.source(),
593 otaHeader.destination(),
602 partStore_.erase(iter);
607 PartKey partKey = PartKey{otaHeader.source(),otaHeader.sequence()};
611 parts.insert(std::make_pair(static_cast<size_t>(pPartInfo->u32Offset_),
612 std::vector<uint8_t>(&buf[payloadIndex],
613 &buf[payloadIndex + pPartInfo->u32Size_])));
615 std::array<uint8_t,sizeof(uuid_t)> uuid;
616 uuid_copy(uuid.data(),remoteUUID);
620 if(otaHeader.has_payloadinfo())
622 auto & payloadInfo = otaHeader.payloadinfo();
624 partStore_.insert({partKey,
625 std::make_tuple(static_cast<size_t>(pPartInfo->u32Size_),
626 payloadInfo.eventlength(),
627 payloadInfo.controllength(),
628 payloadInfo.datalength(),
635 partStore_.insert({partKey,
636 std::make_tuple(static_cast<size_t>(pPartInfo->u32Size_),
651 "OTAManager message part size mismatch");
659 "OTAManager message header could not be deserialized");
666 "OTAManager message missing header missing prefix length encoding");
670 if(lastPartCheckTime_ + partCheckThreshold_ <= now)
672 for(
auto iter = partStore_.begin(); iter != partStore_.end();)
674 auto & lastPartTime = std::get<5>(iter->second);
676 if(lastPartTime + partTimeoutThreshold_ <= now)
678 auto & srcNEM = std::get<0>(iter->first);
680 uuid_copy(uuid,std::get<6>(iter->second).data());
688 "OTAManager missing one or more packet parts src:" 689 " %hu sequence: %ju, dropping.",
691 std::get<1>(iter->first));
693 partStore_.erase(iter++);
701 lastPartCheckTime_ = now;
708 "OTAManager Packet Received error");
716 void EMANE::OTAManager::handleOTAMessage(
NEMId source,
718 const uuid_t & remoteUUID,
730 std::vector<uint8_t> buf{bufferFromVectorIO(eventsSize,
734 EMANEMessage::Event::Data data;
736 if(data.ParseFromArray(&buf[0],eventsSize))
738 for(
const auto & serialization : data.serializations())
741 serialization.eventid(),
742 serialization.data());
746 serialization.eventid());
753 "OTAManager message events could not be deserialized");
761 std::vector<uint8_t> buf{bufferFromVectorIO(controlsSize,
770 for(ControlMessages::const_iterator iter = msgs.begin(),end = msgs.end();
776 auto pSerializedControlMessage =
777 static_cast<const Controls::SerializedControlMessage *
>(*iter);
779 if(pSerializedControlMessage->getSerializedId() ==
782 std::unique_ptr<Controls::OTATransmitterControlMessage>
783 pOTATransmitterControlMessage(Controls::OTATransmitterControlMessage::
784 create(pSerializedControlMessage->getSerialization()));
786 otaTransmitters = pOTATransmitterControlMessage->getOTATransmitters();
797 PacketInfo pktInfo(source,
805 for(; index < vectorIO.size(); ++index, offset=0)
807 packetVectorIO.push_back({
reinterpret_cast<uint8_t *
>(vectorIO[index].iov_base) + offset,
808 vectorIO[index].iov_len - offset});
811 UpstreamPacket pkt(pktInfo,packetVectorIO);
813 if(pkt.length() == dataSize)
817 pktInfo.getSource());
820 for(NEMUserMap::const_iterator iter = nemUserMap_.begin(), end = nemUserMap_.end();
824 if(otaTransmitters.count(iter->first) == 0)
834 "OTAManager packet size does not match reported size in OTA header");
ssize_t recv(void *buf, size_t len, int flags=0)
std::set< NEMId > OTATransmitters
std::string Serialization
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
const PacketInfo & getPacketInfo() const
void registerOTAUser(NEMId id, OTAUser *pOTAUser) override
void sendOTAPacket(NEMId id, const DownstreamPacket &pkt, const ControlMessages &msgs) const override
std::list< const ControlMessage * > ControlMessages
Exception thrown when registering or unregistering OTAUsers.
constexpr std::uint32_t NTOHL(std::uint32_t x)
void setRowLimit(size_t rows)
std::vector< iovec > VectorIO
std::chrono::seconds Seconds
void update(Type type, const uuid_t &uuid, NEMId nemId)
struct sockaddr_in_t __attribute__((__may_alias__))
Store source, destination, creation time and priority information for a packet.
static ControlMessages create(const void *pData, size_t length)
ssize_t send(const iovec *iov, int iovcnt, int flags=0) const
const char * what() const
void update(Type type, const uuid_t &uuid, EventId eventId)
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::string str(bool bWithPort=true) const
int cancel(std::thread &thread)
const OTATransmitters & getOTATransmitters() const
The OTA Transmitter Control Message is by the emulator physical layer to specify the NEM Id of the so...
ControlMessage interface is the base for all control messages.
void setRowLimit(size_t rows)
void open(const INETAddr &address, bool bReuseAddress=false, const std::string &sDevice="", std::uint8_t u8TTL=1, bool bLoop=false)
constexpr std::uint16_t HTONS(std::uint16_t x)
void unregisterOTAUser(NEMId id) override
const EventSerializations & getEventSerializations() const
void setStatEventCountRowLimit(size_t rows)
OTA user interface that allows access to the OTA provider.
Utils::VectorIO getVectorIO() const
Clock::time_point TimePoint
constexpr std::uint16_t NTOHS(std::uint16_t x)
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
constexpr std::uint32_t HTONL(std::uint32_t x)
static EventService * instance()
void setStatPacketCountRowLimit(size_t rows)
void open(const INETAddr &otaGroupAddress, const std::string &sDevice, bool bLoopback, int iTTL, const uuid_t &uuid, size_t otaMTU, Seconds partCheckThreshold, Seconds partTimeoutThreshold)
int elevate(std::thread &thread)
void processEventMessage(NEMId nemId, EventId eventId, const Serialization &serialization, NEMId ignoreNEM={}) const