EMANE  1.0.1
nemqueuedlayer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014,2016 - Adjacent Link LLC, Bridgewater,
3  * New Jersey
4  * Copyright (c) 2008 - DRS CenGen, LLC, Columbia, Maryland
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  * * Neither the name of DRS CenGen, LLC nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "nemqueuedlayer.h"
36 #include "logservice.h"
37 
38 #include <exception>
39 #include <mutex>
40 
41 #include <sys/eventfd.h>
42 #include <sys/epoll.h>
43 #include <unistd.h>
44 
45 namespace
46 {
47  const uint64_t one{1};
48 }
49 
51  NEMLayer{id, pPlatformService},
52  pPlatformService_{pPlatformService},
53  thread_{},
54  iFd_{},
55  iepollFd_{},
56  bCancel_{},
57  pProcessedDownstreamPacket_{},
58  pProcessedUpstreamPacket_{},
59  pProcessedDownstreamControl_{},
60  pProcessedUpstreamControl_{},
61  pProcessedEvent_{},
62  pProcessedTimedEvent_{},
63  pProcessedConfiguration_{}
64 {
65  iFd_ = eventfd(0,0);
66 
67  iepollFd_ = epoll_create1(0);
68 
69  // add the eventfd socket to the epoll instance
70  struct epoll_event ev;
71  ev.events = EPOLLIN;
72  ev.data.fd = iFd_;
73 
74  if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd_,&ev) == -1)
75  {
76  // cannot really do too much at this point, so we'll log it
79  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
80  " unable to add eventfd to epoll",
81  id_);
82  }
83 }
84 
86 {
87  std::lock_guard<std::mutex> m(mutex_);
88 
89  if(!bCancel_ && thread_.joinable())
90  {
91  bCancel_ = true;
92  write(iFd_,&one,sizeof(one));
93  }
94 
95  close(iFd_);
96 }
97 
99 {
100  auto & statisticRegistrar = registrar.statisticRegistrar();
101 
102  pProcessedDownstreamPacket_ =
103  statisticRegistrar.registerNumeric<std::uint64_t>("processedDownstreamPackets",
105 
106  pProcessedUpstreamPacket_ =
107  statisticRegistrar.registerNumeric<std::uint64_t>("processedUpstreamPackets",
109 
110  pProcessedDownstreamControl_ =
111  statisticRegistrar.registerNumeric<std::uint64_t>("processedDownstreamControl",
113 
114  pProcessedUpstreamControl_ =
115  statisticRegistrar.registerNumeric<std::uint64_t>("processedUpstreamControl",
117  pProcessedEvent_ =
118  statisticRegistrar.registerNumeric<std::uint64_t>("processedEvents",
120  pProcessedConfiguration_ =
121  statisticRegistrar.registerNumeric<std::uint64_t>("processedConfiguration",
123  pProcessedTimedEvent_ =
124  statisticRegistrar.registerNumeric<std::uint64_t>("processedTimedEvents",
126 
127  pStatisticHistogramTable_.reset(new Utils::StatisticHistogramTable<EventId>{
128  statisticRegistrar,
129  "EventReceptionTable",
130  {"Event","Total Rx"},
131  "Received event counts"});
132 
133  avgQueueWait_.registerStatistic
134  (statisticRegistrar.registerNumeric<double>("avgProcessAPIQueueWait",
136  "Average API queue wait for a processUpstreamPacket,"
137  " processUpstreamControl, processDownstreamPacket,"
138  " processDownstreamControl, processEvent and"
139  " processTimedEvent in microseconds."));
140 
141  avgQueueDepth_.registerStatistic
142  (statisticRegistrar.registerNumeric<double>("avgProcessAPIQueueDepth",
144  "Average API queue depth for a processUpstreamPacket,"
145  " processUpstreamControl, processDownstreamPacket,"
146  " processDownstreamControl, processEvent and"
147  " processTimedEvent."));
148 
149  avgTimedEventLatency_.registerStatistic
150  (statisticRegistrar.registerNumeric<double>("avgTimedEventLatency",
152 
153  avgTimedEventLatencyRatio_.registerStatistic
154  (statisticRegistrar.registerNumeric<double>("avgTimedEventLatencyRatio",
156  "Average ratio of the delta between the scheduled timer"
157  " expiration and the actual firing over the requested"
158  " duration. An average ratio approaching 1 indicates that"
159  " timer latencies are large in comparison to the requested"
160  " durations."));
161 }
162 
164 {
165  thread_ = std::thread{&EMANE::NEMQueuedLayer::processWorkQueue,this};
166 }
167 
169 {
170  mutex_.lock();
171  bCancel_ = true;
172  write(iFd_,&one,sizeof(one));
173  mutex_.unlock();
174  thread_.join();
175  bCancel_ = false;
176 }
177 
178 void EMANE::NEMQueuedLayer::enqueue_i(QCallback && callback)
179 {
180  std::lock_guard<std::mutex> m(mutex_);
181  queue_.push_back(std::move(callback));
182  avgQueueDepth_.update(queue_.size());
183  write(iFd_,&one,sizeof(one));
184 }
185 
187 {
188  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessConfiguration,
189  this,
190  Clock::now(),
191  update));
192 }
193 
195 {
196  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamControl,
197  this,
198  Clock::now(),
199  msgs));
200 }
201 
203  const ControlMessages & msgs)
204 {
205  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamPacket,
206  this,
207  Clock::now(),
208  pkt,
209  msgs));
210 
211 }
212 
214 {
215  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamPacket,
216  this,
217  Clock::now(),
218  pkt,
219  msgs));
220 }
221 
223 {
224  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamControl,
225  this,
226  Clock::now(),
227  msgs));
228 }
229 
231  const Serialization & serialization)
232 {
233  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessEvent,
234  this,
235  Clock::now(),
236  eventId,
237  serialization));
238 
239 }
240 
242  const TimePoint & expireTime,
243  const TimePoint & scheduleTime,
244  const TimePoint & fireTime,
245  const void * arg)
246 {
247  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessTimedEvent,
248  this,
249  Clock::now(),
250  eventId,
251  expireTime,
252  scheduleTime,
253  fireTime,
254  arg));
255 }
256 
257 void EMANE::NEMQueuedLayer::processWorkQueue()
258 {
259  std::uint64_t u64Expired{};
260 #define MAX_EVENTS 32
261  struct epoll_event events[MAX_EVENTS];
262  int nfds{};
263 
264  while(!bCancel_)
265  {
266  nfds = epoll_wait(iepollFd_,events,MAX_EVENTS,-1);
267 
268  if(nfds == -1)
269  {
271  ERROR_LEVEL,
272  "%03hu NEMQueuedLayer::processWorkQueue:"
273  " epoll_wait error",
274  id_);
275  break;
276  }
277 
278  for(int n = 0; n < nfds; ++n)
279  {
280  if(events[n].data.fd == iFd_)
281  {
282  // wait for an interval timer to expire
283  if(read(iFd_,&u64Expired,sizeof(u64Expired)) > 0)
284  {
285  std::unique_lock<std::mutex> lock(mutex_);
286 
287  if(bCancel_)
288  {
289  break;
290  }
291 
292  MessageProcessingQueue queue{};
293 
294  queue.swap(queue_);
295 
296  lock.unlock();
297 
298  for(auto & entry : queue)
299  {
300  try
301  {
302  // execute the funtor
303  entry();
304  }
305  catch(std::exception & exp)
306  {
307  // cannot really do too much at this point, so we'll log it
309  ERROR_LEVEL,
310  "%03hu NEMQueuedLayer::processWorkQueue:"
311  " Exception caught %s",
312  id_,
313  exp.what());
314  }
315  catch(...)
316  {
317  // cannot really do too much at this point, so we'll log it
319  ERROR_LEVEL,
320  "%03hu NEMQueuedLayer::processWorkQueue:"
321  " Exception caught",
322  id_);
323  }
324  }
325 
326  queue.clear();
327  }
328  }
329  else
330  {
331  auto iter = fileDescriptorStore_.find(events[n].data.fd);
332 
333  if(iter != fileDescriptorStore_.end())
334  {
335  try
336  {
337  iter->second.second(events[n].data.fd);
338  }
339  catch(std::exception & exp)
340  {
341  // cannot really do too much at this point, so we'll log it
343  ERROR_LEVEL,
344  "%03hu NEMQueuedLayer::processWorkQueue:"
345  " Exception caught %s",
346  id_,
347  exp.what());
348  }
349  catch(...)
350  {
351  // cannot really do too much at this point, so we'll log it
353  ERROR_LEVEL,
354  "%03hu NEMQueuedLayer::processWorkQueue:"
355  " Exception caught",
356  id_);
357  }
358  }
359  }
360  }
361  }
362 }
363 
364 void EMANE::NEMQueuedLayer::handleProcessConfiguration(TimePoint enqueueTime,
365  const ConfigurationUpdate update)
366 {
367  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
368 
369  ++*pProcessedConfiguration_;
370 
371  doProcessConfiguration(update);
372 }
373 
374 void EMANE::NEMQueuedLayer::handleProcessDownstreamControl(TimePoint enqueueTime,
375  const ControlMessages msgs)
376 {
377  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
378 
379  ++*pProcessedDownstreamControl_;
380 
382 
383  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
384 }
385 
386 void EMANE::NEMQueuedLayer::handleProcessDownstreamPacket(TimePoint enqueueTime,
387  DownstreamPacket & pkt,
388  const ControlMessages msgs)
389 {
390  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
391 
392  ++*pProcessedDownstreamPacket_;
393 
394  doProcessDownstreamPacket(pkt,msgs);
395 
396  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
397 }
398 
399 void EMANE::NEMQueuedLayer::handleProcessUpstreamPacket(TimePoint enqueueTime,
400  UpstreamPacket & pkt,
401  const ControlMessages msgs)
402 {
403  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
404 
405  ++*pProcessedUpstreamPacket_;
406 
407  doProcessUpstreamPacket(pkt,msgs);
408 
409  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
410 }
411 
412 void EMANE::NEMQueuedLayer::handleProcessUpstreamControl(TimePoint enqueueTime,
413  const ControlMessages msgs)
414 {
415  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
416 
417  ++*pProcessedUpstreamControl_;
418 
420 
421  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
422 }
423 
424 void EMANE::NEMQueuedLayer::handleProcessEvent(TimePoint enqueueTime,
425  const EventId eventId,
426  const Serialization serialization)
427 {
428  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
429 
430  pStatisticHistogramTable_->increment(eventId);
431 
432  ++*pProcessedEvent_;
433 
434  doProcessEvent(eventId,serialization);
435 }
436 
437 
438 void EMANE::NEMQueuedLayer::updateTimerStats(TimePoint enqueueTime,
439  const TimePoint & expireTime,
440  const TimePoint & scheduleTime,
441  const TimePoint & fireTime)
442 {
443  auto now = Clock::now();
444 
445  avgTimedEventLatency_.update(std::chrono::duration_cast<Microseconds>(now - expireTime).count());
446 
447  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(now - enqueueTime).count());
448 
449  auto duration = std::chrono::duration_cast<Microseconds>(expireTime - scheduleTime);
450 
451  if(duration.count() > 0)
452  {
453  avgTimedEventLatencyRatio_.update(std::chrono::duration_cast<Microseconds>(fireTime - expireTime).count() /
454  static_cast<double>(duration.count()));
455  }
456  else
457  {
458  avgTimedEventLatencyRatio_.update(1);
459  }
460 
461  ++*pProcessedTimedEvent_;
462 }
463 
464 void EMANE::NEMQueuedLayer::handleProcessTimedEvent(TimePoint enqueueTime,
465  TimerEventId eventId,
466  const TimePoint & expireTime,
467  const TimePoint & scheduleTime,
468  const TimePoint & fireTime,
469  const void * arg)
470 
471 {
472  updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
473 
474  doProcessTimedEvent(eventId,expireTime,scheduleTime,fireTime,arg);
475 }
476 
477 
478 void EMANE::NEMQueuedLayer::processTimer_i(TimerServiceProvider::TimerCallback callback,
479  const TimePoint & expireTime,
480  const TimePoint & scheduleTime,
481  const TimePoint & fireTime)
482 {
483  auto enqueueTime = Clock::now();
484 
485  enqueue_i([this,enqueueTime,callback,expireTime,scheduleTime,fireTime]()
486  {
487  updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
488  callback(expireTime,scheduleTime,fireTime);
489  });
490 }
491 
492 
493 void EMANE::NEMQueuedLayer::removeFileDescriptor(int iFd)
494 {
495  auto iter = fileDescriptorStore_.find(iFd);
496 
497  if(iter != fileDescriptorStore_.end())
498  {
499  if(epoll_ctl(iepollFd_,EPOLL_CTL_DEL,iFd,nullptr) == -1)
500  {
502  ERROR_LEVEL,
503  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
504  " unable to add fd to epoll",
505  id_);
506  }
507 
508  fileDescriptorStore_.erase(iter);
509  }
510 }
511 
512 void EMANE::NEMQueuedLayer::addFileDescriptor_i(int iFd,
513  DescriptorType type,
514  Callback callback)
515 {
516  auto iter = fileDescriptorStore_.find(iFd);
517 
518  if(iter == fileDescriptorStore_.end())
519  {
520  fileDescriptorStore_.insert(std::make_pair(iFd,std::make_pair(type,callback)));
521 
522  struct epoll_event ev;
523  ev.events = type == DescriptorType::READ ? EPOLLIN : EPOLLOUT;
524  ev.data.fd = iFd;
525 
526  if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd,&ev) == -1)
527  {
529  ERROR_LEVEL,
530  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
531  " unable to add fd to epoll",
532  id_);
533  }
534  }
535  else
536  {
537  if(iter->second.first != type)
538  {
539  iter->second.first = type;
540 
541  struct epoll_event ev;
542  ev.events = type == DescriptorType::READ ? EPOLLIN : EPOLLOUT;
543  ev.data.fd = iFd;
544 
545  if(epoll_ctl(iepollFd_,EPOLL_CTL_MOD,iFd,&ev) == -1)
546  {
548  ERROR_LEVEL,
549  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
550  " unable to add fd to epoll",
551  id_);
552  }
553  }
554 
555  iter->second.second = callback;
556  }
557 }
std::string Serialization
Definition: serializable.h:42
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
The Registrar interface provides access to all of the emulator registrars.
Definition: registrar.h:50
void processDownstreamControl(const ControlMessages &msgs) override
virtual void doProcessUpstreamPacket(UpstreamPacket &, const ControlMessages &)=0
virtual StatisticRegistrar & statisticRegistrar()=0
NEMQueuedLayer(NEMId id, PlatformServiceProvider *pPlatformService)
std::list< const ControlMessage * > ControlMessages
virtual void doProcessEvent(const EventId &, const Serialization &)=0
Base class for NEMLayer containers. Builders construct NEMLayer objects to contain derived instances ...
Definition: nemlayer.h:57
void processUpstreamControl(const ControlMessages &msgs) override
std::uint16_t EventId
Definition: types.h:53
The PlatformServiceProvider interface provides access to emulator services.
void start() override
virtual void doProcessUpstreamControl(const ControlMessages &)=0
virtual void doProcessDownstreamControl(const ControlMessages &)=0
void stop() override
void processConfiguration(const ConfigurationUpdate &update) override
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
virtual void doProcessConfiguration(const ConfigurationUpdate &)=0
std::function< void(const TimePoint &, const TimePoint &, const TimePoint &)> TimerCallback
std::chrono::microseconds Microseconds
Definition: types.h:45
std::uint16_t NEMId
Definition: types.h:52
void processTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg) override
ControlMessage interface is the base for all control messages.
void initialize(Registrar &registrar) override
std::vector< ConfigurationNameAnyValues > ConfigurationUpdate
std::size_t TimerEventId
Definition: types.h:54
Clock::time_point TimePoint
Definition: types.h:50
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static LogService * instance()
Definition: singleton.h:56
virtual void doProcessDownstreamPacket(DownstreamPacket &, const ControlMessages &)=0
void processEvent(const EventId &eventId, const Serialization &serialization) override
virtual void doProcessTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg)=0
void processDownstreamPacket(DownstreamPacket &pkt, const ControlMessages &msgs) override
void processUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs) override
void registerStatistic(StatisticNumeric< T > *p)
Utility class to make a two column statistic table where the first column is the table key and the se...