41 #include <sys/eventfd.h> 42 #include <sys/epoll.h> 47 const uint64_t one{1};
52 pPlatformService_{pPlatformService},
57 pProcessedDownstreamPacket_{},
58 pProcessedUpstreamPacket_{},
59 pProcessedDownstreamControl_{},
60 pProcessedUpstreamControl_{},
62 pProcessedTimedEvent_{},
63 pProcessedConfiguration_{}
67 iepollFd_ = epoll_create1(0);
70 struct epoll_event ev;
74 if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd_,&ev) == -1)
79 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 80 " unable to add eventfd to epoll",
87 std::lock_guard<std::mutex> m(mutex_);
89 if(!bCancel_ && thread_.joinable())
92 write(iFd_,&one,
sizeof(one));
102 pProcessedDownstreamPacket_ =
103 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedDownstreamPackets",
106 pProcessedUpstreamPacket_ =
107 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedUpstreamPackets",
110 pProcessedDownstreamControl_ =
111 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedDownstreamControl",
114 pProcessedUpstreamControl_ =
115 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedUpstreamControl",
118 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedEvents",
120 pProcessedConfiguration_ =
121 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedConfiguration",
123 pProcessedTimedEvent_ =
124 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedTimedEvents",
129 "EventReceptionTable",
130 {
"Event",
"Total Rx"},
131 "Received event counts"});
134 (statisticRegistrar.registerNumeric<
double>(
"avgProcessAPIQueueWait",
136 "Average API queue wait for a processUpstreamPacket," 137 " processUpstreamControl, processDownstreamPacket," 138 " processDownstreamControl, processEvent and" 139 " processTimedEvent in microseconds."));
142 (statisticRegistrar.registerNumeric<
double>(
"avgProcessAPIQueueDepth",
144 "Average API queue depth for a processUpstreamPacket," 145 " processUpstreamControl, processDownstreamPacket," 146 " processDownstreamControl, processEvent and" 147 " processTimedEvent."));
150 (statisticRegistrar.registerNumeric<
double>(
"avgTimedEventLatency",
154 (statisticRegistrar.registerNumeric<
double>(
"avgTimedEventLatencyRatio",
156 "Average ratio of the delta between the scheduled timer" 157 " expiration and the actual firing over the requested" 158 " duration. An average ratio approaching 1 indicates that" 159 " timer latencies are large in comparison to the requested" 165 thread_ = std::thread{&EMANE::NEMQueuedLayer::processWorkQueue,
this};
172 write(iFd_,&one,
sizeof(one));
178 void EMANE::NEMQueuedLayer::enqueue_i(QCallback && callback)
180 std::lock_guard<std::mutex> m(mutex_);
181 queue_.push_back(std::move(callback));
182 avgQueueDepth_.
update(queue_.size());
183 write(iFd_,&one,
sizeof(one));
188 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessConfiguration,
196 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamControl,
205 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamPacket,
215 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamPacket,
224 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamControl,
233 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessEvent,
247 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessTimedEvent,
257 void EMANE::NEMQueuedLayer::processWorkQueue()
259 std::uint64_t u64Expired{};
260 #define MAX_EVENTS 32 261 struct epoll_event events[MAX_EVENTS];
266 nfds = epoll_wait(iepollFd_,events,MAX_EVENTS,-1);
272 "%03hu NEMQueuedLayer::processWorkQueue:" 278 for(
int n = 0; n < nfds; ++n)
280 if(events[n].data.fd == iFd_)
283 if(read(iFd_,&u64Expired,
sizeof(u64Expired)) > 0)
285 std::unique_lock<std::mutex> lock(mutex_);
292 MessageProcessingQueue queue{};
298 for(
auto & entry : queue)
305 catch(std::exception & exp)
310 "%03hu NEMQueuedLayer::processWorkQueue:" 311 " Exception caught %s",
320 "%03hu NEMQueuedLayer::processWorkQueue:" 331 auto iter = fileDescriptorStore_.find(events[n].data.fd);
333 if(iter != fileDescriptorStore_.end())
337 iter->second.second(events[n].data.fd);
339 catch(std::exception & exp)
344 "%03hu NEMQueuedLayer::processWorkQueue:" 345 " Exception caught %s",
354 "%03hu NEMQueuedLayer::processWorkQueue:" 364 void EMANE::NEMQueuedLayer::handleProcessConfiguration(
TimePoint enqueueTime,
367 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
369 ++*pProcessedConfiguration_;
374 void EMANE::NEMQueuedLayer::handleProcessDownstreamControl(
TimePoint enqueueTime,
377 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
379 ++*pProcessedDownstreamControl_;
383 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
386 void EMANE::NEMQueuedLayer::handleProcessDownstreamPacket(
TimePoint enqueueTime,
390 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
392 ++*pProcessedDownstreamPacket_;
396 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
399 void EMANE::NEMQueuedLayer::handleProcessUpstreamPacket(
TimePoint enqueueTime,
403 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
405 ++*pProcessedUpstreamPacket_;
409 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
412 void EMANE::NEMQueuedLayer::handleProcessUpstreamControl(
TimePoint enqueueTime,
415 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
417 ++*pProcessedUpstreamControl_;
421 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
424 void EMANE::NEMQueuedLayer::handleProcessEvent(
TimePoint enqueueTime,
428 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
430 pStatisticHistogramTable_->increment(eventId);
438 void EMANE::NEMQueuedLayer::updateTimerStats(
TimePoint enqueueTime,
443 auto now = Clock::now();
445 avgTimedEventLatency_.
update(std::chrono::duration_cast<Microseconds>(now - expireTime).count());
447 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(now - enqueueTime).count());
449 auto duration = std::chrono::duration_cast<
Microseconds>(expireTime - scheduleTime);
451 if(duration.count() > 0)
453 avgTimedEventLatencyRatio_.
update(std::chrono::duration_cast<Microseconds>(fireTime - expireTime).count() /
454 static_cast<double>(duration.count()));
458 avgTimedEventLatencyRatio_.
update(1);
461 ++*pProcessedTimedEvent_;
464 void EMANE::NEMQueuedLayer::handleProcessTimedEvent(
TimePoint enqueueTime,
472 updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
483 auto enqueueTime = Clock::now();
485 enqueue_i([
this,enqueueTime,callback,expireTime,scheduleTime,fireTime]()
487 updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
488 callback(expireTime,scheduleTime,fireTime);
493 void EMANE::NEMQueuedLayer::removeFileDescriptor(
int iFd)
495 auto iter = fileDescriptorStore_.find(iFd);
497 if(iter != fileDescriptorStore_.end())
499 if(epoll_ctl(iepollFd_,EPOLL_CTL_DEL,iFd,
nullptr) == -1)
503 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 504 " unable to add fd to epoll",
508 fileDescriptorStore_.erase(iter);
512 void EMANE::NEMQueuedLayer::addFileDescriptor_i(
int iFd,
516 auto iter = fileDescriptorStore_.find(iFd);
518 if(iter == fileDescriptorStore_.end())
520 fileDescriptorStore_.insert(std::make_pair(iFd,std::make_pair(type,callback)));
522 struct epoll_event ev;
526 if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd,&ev) == -1)
530 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 531 " unable to add fd to epoll",
537 if(iter->second.first != type)
539 iter->second.first = type;
541 struct epoll_event ev;
545 if(epoll_ctl(iepollFd_,EPOLL_CTL_MOD,iFd,&ev) == -1)
549 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 550 " unable to add fd to epoll",
555 iter->second.second = callback;
std::string Serialization
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
The Registrar interface provides access to all of the emulator registrars.
void processDownstreamControl(const ControlMessages &msgs) override
virtual void doProcessUpstreamPacket(UpstreamPacket &, const ControlMessages &)=0
std::function< void(int iFd)> Callback
virtual StatisticRegistrar & statisticRegistrar()=0
NEMQueuedLayer(NEMId id, PlatformServiceProvider *pPlatformService)
std::list< const ControlMessage * > ControlMessages
virtual void doProcessEvent(const EventId &, const Serialization &)=0
Base class for NEMLayer containers. Builders construct NEMLayer objects to contain derived instances ...
void processUpstreamControl(const ControlMessages &msgs) override
virtual void doProcessUpstreamControl(const ControlMessages &)=0
virtual void doProcessDownstreamControl(const ControlMessages &)=0
void processConfiguration(const ConfigurationUpdate &update) override
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
virtual void doProcessConfiguration(const ConfigurationUpdate &)=0
std::function< void(const TimePoint &, const TimePoint &, const TimePoint &)> TimerCallback
std::chrono::microseconds Microseconds
void processTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg) override
ControlMessage interface is the base for all control messages.
void initialize(Registrar ®istrar) override
std::vector< ConfigurationNameAnyValues > ConfigurationUpdate
Clock::time_point TimePoint
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static LogService * instance()
virtual void doProcessDownstreamPacket(DownstreamPacket &, const ControlMessages &)=0
void processEvent(const EventId &eventId, const Serialization &serialization) override
virtual void doProcessTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg)=0
void processDownstreamPacket(DownstreamPacket &pkt, const ControlMessages &msgs) override
void processUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs) override
void registerStatistic(StatisticNumeric< T > *p)
Utility class to make a two column statistic table where the first column is the table key and the se...