EMANE  1.2.1
nemqueuedlayer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2014,2016-2017 - Adjacent Link LLC, Bridgewater,
3  * New Jersey
4  * Copyright (c) 2008 - DRS CenGen, LLC, Columbia, Maryland
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  * * Neither the name of DRS CenGen, LLC nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "nemqueuedlayer.h"
36 #include "logservice.h"
37 
38 #include <exception>
39 #include <mutex>
40 
41 #include <sys/eventfd.h>
42 #include <sys/epoll.h>
43 #include <unistd.h>
44 #include <cstring>
45 
46 namespace
47 {
48  const uint64_t one{1};
49 }
50 
52  NEMLayer{id, pPlatformService},
53  pPlatformService_{pPlatformService},
54  thread_{},
55  iFd_{},
56  iepollFd_{},
57  bCancel_{},
58  pProcessedDownstreamPacket_{},
59  pProcessedUpstreamPacket_{},
60  pProcessedDownstreamControl_{},
61  pProcessedUpstreamControl_{},
62  pProcessedEvent_{},
63  pProcessedTimedEvent_{},
64  pProcessedConfiguration_{}
65 {
66  iFd_ = eventfd(0,0);
67 
68  iepollFd_ = epoll_create1(0);
69 
70  // add the eventfd socket to the epoll instance
71  struct epoll_event ev;
72  ev.events = EPOLLIN;
73  ev.data.fd = iFd_;
74 
75  if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd_,&ev) == -1)
76  {
77  // cannot really do too much at this point, so we'll log it
80  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
81  " unable to add eventfd to epoll",
82  id_);
83  }
84 }
85 
87 {
88  mutex_.lock();
89 
90  if(!bCancel_ && thread_.joinable())
91  {
92  bCancel_ = true;
93  write(iFd_,&one,sizeof(one));
94  mutex_.unlock();
95  thread_.join();
96  mutex_.lock();
97  }
98 
99  mutex_.unlock();
100 
101  close(iFd_);
102 }
103 
105 {
106  auto & statisticRegistrar = registrar.statisticRegistrar();
107 
108  pProcessedDownstreamPacket_ =
109  statisticRegistrar.registerNumeric<std::uint64_t>("processedDownstreamPackets",
111 
112  pProcessedUpstreamPacket_ =
113  statisticRegistrar.registerNumeric<std::uint64_t>("processedUpstreamPackets",
115 
116  pProcessedDownstreamControl_ =
117  statisticRegistrar.registerNumeric<std::uint64_t>("processedDownstreamControl",
119 
120  pProcessedUpstreamControl_ =
121  statisticRegistrar.registerNumeric<std::uint64_t>("processedUpstreamControl",
123  pProcessedEvent_ =
124  statisticRegistrar.registerNumeric<std::uint64_t>("processedEvents",
126  pProcessedConfiguration_ =
127  statisticRegistrar.registerNumeric<std::uint64_t>("processedConfiguration",
129  pProcessedTimedEvent_ =
130  statisticRegistrar.registerNumeric<std::uint64_t>("processedTimedEvents",
132 
133  pStatisticHistogramTable_.reset(new Utils::StatisticHistogramTable<EventId>{
134  statisticRegistrar,
135  "EventReceptionTable",
136  {"Event","Total Rx"},
137  "Received event counts"});
138 
139  avgQueueWait_.registerStatistic
140  (statisticRegistrar.registerNumeric<double>("avgProcessAPIQueueWait",
142  "Average API queue wait for a processUpstreamPacket,"
143  " processUpstreamControl, processDownstreamPacket,"
144  " processDownstreamControl, processEvent and"
145  " processTimedEvent in microseconds."));
146 
147  avgQueueDepth_.registerStatistic
148  (statisticRegistrar.registerNumeric<double>("avgProcessAPIQueueDepth",
150  "Average API queue depth for a processUpstreamPacket,"
151  " processUpstreamControl, processDownstreamPacket,"
152  " processDownstreamControl, processEvent and"
153  " processTimedEvent."));
154 
155  avgTimedEventLatency_.registerStatistic
156  (statisticRegistrar.registerNumeric<double>("avgTimedEventLatency",
158 
159  avgTimedEventLatencyRatio_.registerStatistic
160  (statisticRegistrar.registerNumeric<double>("avgTimedEventLatencyRatio",
162  "Average ratio of the delta between the scheduled timer"
163  " expiration and the actual firing over the requested"
164  " duration. An average ratio approaching 1 indicates that"
165  " timer latencies are large in comparison to the requested"
166  " durations."));
167 }
168 
170 {
171  thread_ = std::thread{&EMANE::NEMQueuedLayer::processWorkQueue,this};
172 }
173 
175 {
176  mutex_.lock();
177  bCancel_ = true;
178  write(iFd_,&one,sizeof(one));
179  mutex_.unlock();
180  thread_.join();
181  bCancel_ = false;
182 }
183 
184 void EMANE::NEMQueuedLayer::enqueue_i(QCallback && callback)
185 {
186  std::lock_guard<std::mutex> m(mutex_);
187  queue_.push_back(std::move(callback));
188  avgQueueDepth_.update(queue_.size());
189  write(iFd_,&one,sizeof(one));
190 }
191 
193 {
194  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessConfiguration,
195  this,
196  Clock::now(),
197  update));
198 }
199 
201 {
202  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamControl,
203  this,
204  Clock::now(),
205  msgs));
206 }
207 
209  const ControlMessages & msgs)
210 {
211  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessDownstreamPacket,
212  this,
213  Clock::now(),
214  pkt,
215  msgs));
216 
217 }
218 
220 {
221  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamPacket,
222  this,
223  Clock::now(),
224  pkt,
225  msgs));
226 }
227 
229 {
230  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessUpstreamControl,
231  this,
232  Clock::now(),
233  msgs));
234 }
235 
237  const Serialization & serialization)
238 {
239  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessEvent,
240  this,
241  Clock::now(),
242  eventId,
243  serialization));
244 
245 }
246 
248  const TimePoint & expireTime,
249  const TimePoint & scheduleTime,
250  const TimePoint & fireTime,
251  const void * arg)
252 {
253  enqueue_i(std::bind(&NEMQueuedLayer::handleProcessTimedEvent,
254  this,
255  Clock::now(),
256  eventId,
257  expireTime,
258  scheduleTime,
259  fireTime,
260  arg));
261 }
262 
263 void EMANE::NEMQueuedLayer::processWorkQueue()
264 {
265  std::uint64_t u64Expired{};
266 #define MAX_EVENTS 32
267  struct epoll_event events[MAX_EVENTS];
268  int nfds{};
269 
270  while(!bCancel_)
271  {
272  nfds = epoll_wait(iepollFd_,events,MAX_EVENTS,-1);
273 
274  if(nfds == -1)
275  {
276  if(errno == EINTR)
277  {
278  continue;
279  }
280 
282  ERROR_LEVEL,
283  "%03hu NEMQueuedLayer::processWorkQueue:"
284  " epoll_wait error: %s",
285  id_,
286  strerror(errno));
287  break;
288  }
289 
290  for(int n = 0; n < nfds; ++n)
291  {
292  if(events[n].data.fd == iFd_)
293  {
294  // wait for an interval timer to expire
295  if(read(iFd_,&u64Expired,sizeof(u64Expired)) > 0)
296  {
297  std::unique_lock<std::mutex> lock(mutex_);
298 
299  if(bCancel_)
300  {
301  break;
302  }
303 
304  MessageProcessingQueue queue{};
305 
306  queue.swap(queue_);
307 
308  lock.unlock();
309 
310  for(auto & entry : queue)
311  {
312  try
313  {
314  // execute the funtor
315  entry();
316  }
317  catch(std::exception & exp)
318  {
319  // cannot really do too much at this point, so we'll log it
321  ERROR_LEVEL,
322  "%03hu NEMQueuedLayer::processWorkQueue:"
323  " Exception caught %s",
324  id_,
325  exp.what());
326  }
327  catch(...)
328  {
329  // cannot really do too much at this point, so we'll log it
331  ERROR_LEVEL,
332  "%03hu NEMQueuedLayer::processWorkQueue:"
333  " Exception caught",
334  id_);
335  }
336  }
337 
338  queue.clear();
339  }
340  }
341  else
342  {
343  auto iter = fileDescriptorStore_.find(events[n].data.fd);
344 
345  if(iter != fileDescriptorStore_.end())
346  {
347  try
348  {
349  iter->second.second(events[n].data.fd);
350  }
351  catch(std::exception & exp)
352  {
353  // cannot really do too much at this point, so we'll log it
355  ERROR_LEVEL,
356  "%03hu NEMQueuedLayer::processWorkQueue:"
357  " Exception caught %s",
358  id_,
359  exp.what());
360  }
361  catch(...)
362  {
363  // cannot really do too much at this point, so we'll log it
365  ERROR_LEVEL,
366  "%03hu NEMQueuedLayer::processWorkQueue:"
367  " Exception caught",
368  id_);
369  }
370  }
371  }
372  }
373  }
374 }
375 
376 void EMANE::NEMQueuedLayer::handleProcessConfiguration(TimePoint enqueueTime,
377  const ConfigurationUpdate update)
378 {
379  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
380 
381  ++*pProcessedConfiguration_;
382 
383  doProcessConfiguration(update);
384 }
385 
386 void EMANE::NEMQueuedLayer::handleProcessDownstreamControl(TimePoint enqueueTime,
387  const ControlMessages msgs)
388 {
389  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
390 
391  ++*pProcessedDownstreamControl_;
392 
394 
395  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
396 }
397 
398 void EMANE::NEMQueuedLayer::handleProcessDownstreamPacket(TimePoint enqueueTime,
399  DownstreamPacket & pkt,
400  const ControlMessages msgs)
401 {
402  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
403 
404  ++*pProcessedDownstreamPacket_;
405 
406  doProcessDownstreamPacket(pkt,msgs);
407 
408  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
409 }
410 
411 void EMANE::NEMQueuedLayer::handleProcessUpstreamPacket(TimePoint enqueueTime,
412  UpstreamPacket & pkt,
413  const ControlMessages msgs)
414 {
415  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
416 
417  ++*pProcessedUpstreamPacket_;
418 
419  doProcessUpstreamPacket(pkt,msgs);
420 
421  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
422 }
423 
424 void EMANE::NEMQueuedLayer::handleProcessUpstreamControl(TimePoint enqueueTime,
425  const ControlMessages msgs)
426 {
427  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
428 
429  ++*pProcessedUpstreamControl_;
430 
432 
433  std::for_each(msgs.begin(),msgs.end(),[](const ControlMessage * p){delete p;});
434 }
435 
436 void EMANE::NEMQueuedLayer::handleProcessEvent(TimePoint enqueueTime,
437  const EventId eventId,
438  const Serialization serialization)
439 {
440  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(Clock::now() - enqueueTime).count());
441 
442  pStatisticHistogramTable_->increment(eventId);
443 
444  ++*pProcessedEvent_;
445 
446  doProcessEvent(eventId,serialization);
447 }
448 
449 
450 void EMANE::NEMQueuedLayer::updateTimerStats(TimePoint enqueueTime,
451  const TimePoint & expireTime,
452  const TimePoint & scheduleTime,
453  const TimePoint & fireTime)
454 {
455  auto now = Clock::now();
456 
457  avgTimedEventLatency_.update(std::chrono::duration_cast<Microseconds>(now - expireTime).count());
458 
459  avgQueueWait_.update(std::chrono::duration_cast<Microseconds>(now - enqueueTime).count());
460 
461  auto duration = std::chrono::duration_cast<Microseconds>(expireTime - scheduleTime);
462 
463  if(duration.count() > 0)
464  {
465  avgTimedEventLatencyRatio_.update(std::chrono::duration_cast<Microseconds>(fireTime - expireTime).count() /
466  static_cast<double>(duration.count()));
467  }
468  else
469  {
470  avgTimedEventLatencyRatio_.update(1);
471  }
472 
473  ++*pProcessedTimedEvent_;
474 }
475 
476 void EMANE::NEMQueuedLayer::handleProcessTimedEvent(TimePoint enqueueTime,
477  TimerEventId eventId,
478  const TimePoint & expireTime,
479  const TimePoint & scheduleTime,
480  const TimePoint & fireTime,
481  const void * arg)
482 
483 {
484  updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
485 
486  doProcessTimedEvent(eventId,expireTime,scheduleTime,fireTime,arg);
487 }
488 
489 
490 void EMANE::NEMQueuedLayer::processTimer_i(TimerServiceProvider::TimerCallback callback,
491  const TimePoint & expireTime,
492  const TimePoint & scheduleTime,
493  const TimePoint & fireTime)
494 {
495  auto enqueueTime = Clock::now();
496 
497  enqueue_i([this,enqueueTime,callback,expireTime,scheduleTime,fireTime]()
498  {
499  updateTimerStats(enqueueTime,expireTime,scheduleTime,fireTime);
500  callback(expireTime,scheduleTime,fireTime);
501  });
502 }
503 
504 
505 void EMANE::NEMQueuedLayer::removeFileDescriptor(int iFd)
506 {
507  auto iter = fileDescriptorStore_.find(iFd);
508 
509  if(iter != fileDescriptorStore_.end())
510  {
511  if(epoll_ctl(iepollFd_,EPOLL_CTL_DEL,iFd,nullptr) == -1)
512  {
514  ERROR_LEVEL,
515  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
516  " unable to add fd to epoll",
517  id_);
518  }
519 
520  fileDescriptorStore_.erase(iter);
521  }
522 }
523 
524 void EMANE::NEMQueuedLayer::addFileDescriptor_i(int iFd,
525  DescriptorType type,
526  Callback callback)
527 {
528  auto iter = fileDescriptorStore_.find(iFd);
529 
530  if(iter == fileDescriptorStore_.end())
531  {
532  fileDescriptorStore_.insert(std::make_pair(iFd,std::make_pair(type,callback)));
533 
534  struct epoll_event ev;
535  ev.events = type == DescriptorType::READ ? EPOLLIN : EPOLLOUT;
536  ev.data.fd = iFd;
537 
538  if(epoll_ctl(iepollFd_,EPOLL_CTL_ADD,iFd,&ev) == -1)
539  {
541  ERROR_LEVEL,
542  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
543  " unable to add fd to epoll",
544  id_);
545  }
546  }
547  else
548  {
549  if(iter->second.first != type)
550  {
551  iter->second.first = type;
552 
553  struct epoll_event ev;
554  ev.events = type == DescriptorType::READ ? EPOLLIN : EPOLLOUT;
555  ev.data.fd = iFd;
556 
557  if(epoll_ctl(iepollFd_,EPOLL_CTL_MOD,iFd,&ev) == -1)
558  {
560  ERROR_LEVEL,
561  "%03hu NEMQueuedLayer::NEMQueuedLayer:"
562  " unable to add fd to epoll",
563  id_);
564  }
565  }
566 
567  iter->second.second = callback;
568  }
569 }
std::string Serialization
Definition: serializable.h:42
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
The Registrar interface provides access to all of the emulator registrars.
Definition: registrar.h:50
void processDownstreamControl(const ControlMessages &msgs) override
virtual void doProcessUpstreamPacket(UpstreamPacket &, const ControlMessages &)=0
virtual StatisticRegistrar & statisticRegistrar()=0
NEMQueuedLayer(NEMId id, PlatformServiceProvider *pPlatformService)
std::list< const ControlMessage * > ControlMessages
virtual void doProcessEvent(const EventId &, const Serialization &)=0
Base class for NEMLayer containers. Builders construct NEMLayer objects to contain derived instances ...
Definition: nemlayer.h:57
void processUpstreamControl(const ControlMessages &msgs) override
std::uint16_t EventId
Definition: types.h:53
The PlatformServiceProvider interface provides access to emulator services.
void start() override
virtual void doProcessUpstreamControl(const ControlMessages &)=0
virtual void doProcessDownstreamControl(const ControlMessages &)=0
void stop() override
void processConfiguration(const ConfigurationUpdate &update) override
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
virtual void doProcessConfiguration(const ConfigurationUpdate &)=0
std::function< void(const TimePoint &, const TimePoint &, const TimePoint &)> TimerCallback
std::chrono::microseconds Microseconds
Definition: types.h:45
std::uint16_t NEMId
Definition: types.h:52
void processTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg) override
ControlMessage interface is the base for all control messages.
void initialize(Registrar &registrar) override
std::vector< ConfigurationNameAnyValues > ConfigurationUpdate
std::size_t TimerEventId
Definition: types.h:54
Clock::time_point TimePoint
Definition: types.h:50
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static LogService * instance()
Definition: singleton.h:56
virtual void doProcessDownstreamPacket(DownstreamPacket &, const ControlMessages &)=0
void processEvent(const EventId &eventId, const Serialization &serialization) override
virtual void doProcessTimedEvent(TimerEventId eventId, const TimePoint &expireTime, const TimePoint &scheduleTime, const TimePoint &fireTime, const void *arg)=0
void processDownstreamPacket(DownstreamPacket &pkt, const ControlMessages &msgs) override
void processUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs) override
void registerStatistic(StatisticNumeric< T > *p)
Utility class to make a two column statistic table where the first column is the table key and the se...