41 #include <sys/eventfd.h> 42 #include <sys/epoll.h> 48 const uint64_t one{1};
53 pPlatformService_{pPlatformService},
58 pProcessedDownstreamPacket_{},
59 pProcessedUpstreamPacket_{},
60 pProcessedDownstreamControl_{},
61 pProcessedUpstreamControl_{},
63 pProcessedTimedEvent_{},
64 pProcessedConfiguration_{}
68 iepollFd_ = epoll_create1(0);
71 struct epoll_event ev;
75 if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd_,&ev) == -1)
80 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 81 " unable to add eventfd to epoll",
90 if(!bCancel_ && thread_.joinable())
93 write(iFd_,&one,
sizeof(one));
108 pProcessedDownstreamPacket_ =
109 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedDownstreamPackets",
112 pProcessedUpstreamPacket_ =
113 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedUpstreamPackets",
116 pProcessedDownstreamControl_ =
117 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedDownstreamControl",
120 pProcessedUpstreamControl_ =
121 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedUpstreamControl",
124 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedEvents",
126 pProcessedConfiguration_ =
127 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedConfiguration",
129 pProcessedTimedEvent_ =
130 statisticRegistrar.registerNumeric<std::uint64_t>(
"processedTimedEvents",
135 "EventReceptionTable",
136 {
"Event",
"Total Rx"},
137 "Received event counts"});
140 (statisticRegistrar.registerNumeric<
double>(
"avgProcessAPIQueueWait",
142 "Average API queue wait for a processUpstreamPacket," 143 " processUpstreamControl, processDownstreamPacket," 144 " processDownstreamControl, processEvent and" 145 " processTimedEvent in microseconds."));
148 (statisticRegistrar.registerNumeric<
double>(
"avgProcessAPIQueueDepth",
150 "Average API queue depth for a processUpstreamPacket," 151 " processUpstreamControl, processDownstreamPacket," 152 " processDownstreamControl, processEvent and" 153 " processTimedEvent."));
156 (statisticRegistrar.registerNumeric<
double>(
"avgTimedEventLatency",
160 (statisticRegistrar.registerNumeric<
double>(
"avgTimedEventLatencyRatio",
162 "Average ratio of the delta between the scheduled timer" 163 " expiration and the actual firing over the requested" 164 " duration. An average ratio approaching 1 indicates that" 165 " timer latencies are large in comparison to the requested" 171 thread_ = std::thread{&EMANE::NEMQueuedLayer::processWorkQueue,
this};
178 write(iFd_,&one,
sizeof(one));
184 void EMANE::NEMQueuedLayer::enqueue_i(QCallback && callback)
186 std::lock_guard<std::mutex> m(mutex_);
187 queue_.push_back(std::move(callback));
188 avgQueueDepth_.
update(queue_.size());
189 write(iFd_,&one,
sizeof(one));
194 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessConfiguration,
202 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamControl,
211 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamPacket,
221 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamPacket,
230 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamControl,
239 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessEvent,
253 enqueue_i(std::bind(&NEMQueuedLayer::handleProcessTimedEvent,
263 void EMANE::NEMQueuedLayer::processWorkQueue()
265 std::uint64_t u64Expired{};
266 #define MAX_EVENTS 32 267 struct epoll_event events[MAX_EVENTS];
272 nfds = epoll_wait(iepollFd_,events,MAX_EVENTS,-1);
283 "%03hu NEMQueuedLayer::processWorkQueue:" 284 " epoll_wait error: %s",
290 for(
int n = 0; n < nfds; ++n)
292 if(events[n].data.fd == iFd_)
295 if(read(iFd_,&u64Expired,
sizeof(u64Expired)) > 0)
297 std::unique_lock<std::mutex> lock(mutex_);
304 MessageProcessingQueue queue{};
310 for(
auto & entry : queue)
317 catch(std::exception & exp)
322 "%03hu NEMQueuedLayer::processWorkQueue:" 323 " Exception caught %s",
332 "%03hu NEMQueuedLayer::processWorkQueue:" 343 auto iter = fileDescriptorStore_.find(events[n].data.fd);
345 if(iter != fileDescriptorStore_.end())
349 iter->second.second(events[n].data.fd);
351 catch(std::exception & exp)
356 "%03hu NEMQueuedLayer::processWorkQueue:" 357 " Exception caught %s",
366 "%03hu NEMQueuedLayer::processWorkQueue:" 376 void EMANE::NEMQueuedLayer::handleProcessConfiguration(
TimePoint enqueueTime,
379 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
381 ++*pProcessedConfiguration_;
386 void EMANE::NEMQueuedLayer::handleProcessDownstreamControl(
TimePoint enqueueTime,
389 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
391 ++*pProcessedDownstreamControl_;
395 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
398 void EMANE::NEMQueuedLayer::handleProcessDownstreamPacket(
TimePoint enqueueTime,
402 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
404 ++*pProcessedDownstreamPacket_;
408 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
411 void EMANE::NEMQueuedLayer::handleProcessUpstreamPacket(
TimePoint enqueueTime,
415 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
417 ++*pProcessedUpstreamPacket_;
421 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
424 void EMANE::NEMQueuedLayer::handleProcessUpstreamControl(
TimePoint enqueueTime,
427 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
429 ++*pProcessedUpstreamControl_;
433 std::for_each(msgs.begin(),msgs.end(),[](
const ControlMessage * p){
delete p;});
436 void EMANE::NEMQueuedLayer::handleProcessEvent(
TimePoint enqueueTime,
440 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
442 pStatisticHistogramTable_->increment(eventId);
450 void EMANE::NEMQueuedLayer::updateTimerStats(
TimePoint enqueueTime,
455 auto now = Clock::now();
457 avgTimedEventLatency_.
update(std::chrono::duration_cast<Microseconds>(now - expireTime).count());
459 avgQueueWait_.
update(std::chrono::duration_cast<Microseconds>(now - enqueueTime).count());
461 auto duration = std::chrono::duration_cast<
Microseconds>(expireTime - scheduleTime);
463 if(duration.count() > 0)
465 avgTimedEventLatencyRatio_.
update(std::chrono::duration_cast<Microseconds>(fireTime - expireTime).count() /
466 static_cast<double>(duration.count()));
470 avgTimedEventLatencyRatio_.
update(1);
473 ++*pProcessedTimedEvent_;
476 void EMANE::NEMQueuedLayer::handleProcessTimedEvent(
TimePoint enqueueTime,
484 updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
495 auto enqueueTime = Clock::now();
497 enqueue_i([
this,enqueueTime,callback,expireTime,scheduleTime,fireTime]()
499 updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
500 callback(expireTime,scheduleTime,fireTime);
505 void EMANE::NEMQueuedLayer::removeFileDescriptor(
int iFd)
507 auto iter = fileDescriptorStore_.find(iFd);
509 if(iter != fileDescriptorStore_.end())
511 if(epoll_ctl(iepollFd_,EPOLL_CTL_DEL,iFd,
nullptr) == -1)
515 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 516 " unable to add fd to epoll",
520 fileDescriptorStore_.erase(iter);
524 void EMANE::NEMQueuedLayer::addFileDescriptor_i(
int iFd,
528 auto iter = fileDescriptorStore_.find(iFd);
530 if(iter == fileDescriptorStore_.end())
532 fileDescriptorStore_.insert(std::make_pair(iFd,std::make_pair(type,callback)));
534 struct epoll_event ev;
538 if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd,&ev) == -1)
542 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 543 " unable to add fd to epoll",
549 if(iter->second.first != type)
551 iter->second.first = type;
553 struct epoll_event ev;
557 if(epoll_ctl(iepollFd_,EPOLL_CTL_MOD,iFd,&ev) == -1)
561 "%03hu NEMQueuedLayer::NEMQueuedLayer:" 562 " unable to add fd to epoll",
567 iter->second.second = callback;
std::string Serialization
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
The Registrar interface provides access to all of the emulator registrars.
void processDownstreamControl(const ControlMessages &msgs) override
virtual void doProcessUpstreamPacket(UpstreamPacket &, const ControlMessages &)=0
std::function< void(int iFd)> Callback
virtual StatisticRegistrar & statisticRegistrar()=0
NEMQueuedLayer(NEMId id, PlatformServiceProvider *pPlatformService)
std::list< const ControlMessage * > ControlMessages
virtual void doProcessEvent(const EventId &, const Serialization &)=0
Base class for NEMLayer containers. Builders construct NEMLayer objects to contain derived instances ...
void processUpstreamControl(const ControlMessages &msgs) override
virtual void doProcessUpstreamControl(const ControlMessages &)=0
virtual void doProcessDownstreamControl(const ControlMessages &)=0
void processConfiguration(const ConfigurationUpdate &update) override
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
virtual void doProcessConfiguration(const ConfigurationUpdate &)=0
std::function< void(const TimePoint &, const TimePoint &, const TimePoint &)> TimerCallback
std::chrono::microseconds Microseconds
void processTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg) override
ControlMessage interface is the base for all control messages.
void initialize(Registrar ®istrar) override
std::vector< ConfigurationNameAnyValues > ConfigurationUpdate
Clock::time_point TimePoint
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static LogService * instance()
virtual void doProcessDownstreamPacket(DownstreamPacket &, const ControlMessages &)=0
void processEvent(const EventId &eventId, const Serialization &serialization) override
virtual void doProcessTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg)=0
void processDownstreamPacket(DownstreamPacket &pkt, const ControlMessages &msgs) override
void processUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs) override
void registerStatistic(StatisticNumeric< T > *p)
Utility class to make a two column statistic table where the first column is the table key and the se...