EMANE  1.2.1
basicqueuemanager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 - Adjacent Link LLC, Bridgewater, New Jersey
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in
13  * the documentation and/or other materials provided with the
14  * distribution.
15  * * Neither the name of Adjacent Link LLC nor the names of its
16  * contributors may be used to endorse or promote products derived
17  * from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
35 #include "queue.h"
36 #include "queuestatuspublisher.h"
37 
38 namespace
39 {
40  const int MAX_QUEUES = 5;
41 }
42 
43 class EMANE::Models::TDMA::BasicQueueManager::Implementation
44 {
45 public:
46  bool bAggregationEnable_{};
47  bool bFragmentationEnable_{};
48  bool bStrictDequeueEnable_{};
49  double dAggregationSlotThreshold_{};
50  Queue queues_[MAX_QUEUES];
51  QueueStatusPublisher queueStatusPublisher_;
52 };
53 
54 
56  PlatformServiceProvider * pPlatformServiceProvider):
57  QueueManager{id,pPlatformServiceProvider},
58  pImpl_{new Implementation{}}{}
59 
61 
63 {
66  "MACI %03hu TDMA::BasicQueueManager::%s",
67  id_,
68  __func__);
69 
70  auto & configRegistrar = registrar.configurationRegistrar();
71 
72  configRegistrar.registerNumeric<std::uint16_t>("queue.depth",
74  {256},
75  "Defines the size of the per service class downstream packet"
76  " queues (in packets). Each of the 5 queues (control + 4"
77  " service classes) will be 'queuedepth' size.");
78 
79  configRegistrar.registerNumeric<bool>("queue.aggregationenable",
81  {true},
82  "Defines whether packet aggregation is enabled for transmission. When"
83  " enabled, multiple packets can be sent in the same transmission when"
84  " there is additional room within the slot.");
85 
86 
87  configRegistrar.registerNumeric<bool>("queue.fragmentationenable",
89  {true},
90  "Defines whether packet fragmentation is enabled. When enabled, a single"
91  " packet will be fragmented into multiple message components to be sent"
92  " over multiple transmissions when the slot is too small. When disabled"
93  " and the packet matches the traffic class for the transmit slot as"
94  " defined in the TDMA schedule, the packet will be discarded.");
95 
96 
97  configRegistrar.registerNumeric<bool>("queue.strictdequeueenable",
99  {false},
100  "Defines whether packets will be dequeued from a queue other than what"
101  " has been specified when there are no eligible packets for dequeue in"
102  " the specified queue. Queues are dequeued highest priority first.");
103 
104  configRegistrar.registerNumeric<double>("queue.aggregationslotthreshold",
106  {90.0},
107  "Defines the percentage of a slot that must be filled in order to conclude"
108  " aggregation when queue.aggregationenable is enabled.",
109  0,
110  100.0);
111 
112  auto & statisticRegistrar = registrar.statisticRegistrar();
113 
114  pImpl_->queueStatusPublisher_.registerStatistics(statisticRegistrar);
115 
116 }
117 
119 {
121  DEBUG_LEVEL,
122  "MACI %03hu TDMA::BasicQueueManager::%s",
123  id_,
124  __func__);
125 
126  std::uint16_t u16QueueDepth{};
127 
128  for(const auto & item : update)
129  {
130  if(item.first == "queue.depth")
131  {
132  u16QueueDepth = item.second[0].asUINT16();
133 
135  INFO_LEVEL,
136  "MACI %03hu TDMA::BasicQueueManager::%s: %s = %hu",
137  id_,
138  __func__,
139  item.first.c_str(),
140  u16QueueDepth);
141  }
142  else if(item.first == "queue.aggregationenable")
143  {
144  pImpl_->bAggregationEnable_ = item.second[0].asBool();
145 
147  INFO_LEVEL,
148  "MACI %03hu TDMA::BaseModel::%s: %s = %s",
149  id_,
150  __func__,
151  item.first.c_str(),
152  pImpl_->bAggregationEnable_ ? "on" : "off");
153  }
154  else if(item.first == "queue.fragmentationenable")
155  {
156  pImpl_->bFragmentationEnable_ = item.second[0].asBool();
157 
159  INFO_LEVEL,
160  "MACI %03hu TDMA::BaseModel::%s: %s = %s",
161  id_,
162  __func__,
163  item.first.c_str(),
164  pImpl_->bFragmentationEnable_ ? "on" : "off");
165  }
166  else if(item.first == "queue.strictdequeueenable")
167  {
168  pImpl_->bStrictDequeueEnable_ = item.second[0].asBool();
169 
171  INFO_LEVEL,
172  "MACI %03hu TDMA::BaseModel::%s: %s = %s",
173  id_,
174  __func__,
175  item.first.c_str(),
176  pImpl_->bStrictDequeueEnable_ ? "on" : "off");
177  }
178  else if(item.first == "queue.aggregationslotthreshold")
179  {
180  pImpl_->dAggregationSlotThreshold_ = item.second[0].asDouble();
181 
183  INFO_LEVEL,
184  "MACI %03hu TDMA::BaseModel::%s: %s = %lf",
185  id_,
186  __func__,
187  item.first.c_str(),
188  pImpl_->dAggregationSlotThreshold_);
189  }
190 
191  else
192  {
193  throw makeException<ConfigureException>("TDMA::BasicQueueManager: "
194  "Unexpected configuration item %s",
195  item.first.c_str());
196  }
197  }
198 
199  // queue for user traffic mapped by dhcp
200  pImpl_->queues_[0].initialize(u16QueueDepth,
201  pImpl_->bFragmentationEnable_,
202  pImpl_->bAggregationEnable_,
203  false);
204 
205  pImpl_->queues_[1].initialize(u16QueueDepth,
206  pImpl_->bFragmentationEnable_,
207  pImpl_->bAggregationEnable_,
208  false);
209 
210  pImpl_->queues_[2].initialize(u16QueueDepth,
211  pImpl_->bFragmentationEnable_,
212  pImpl_->bAggregationEnable_,
213  false);
214  pImpl_->queues_[3].initialize(u16QueueDepth,
215  pImpl_->bFragmentationEnable_,
216  pImpl_->bAggregationEnable_,
217  false);
218 
219  // control queue for scheduler-to-scheduler OTA messages
220  pImpl_->queues_[4].initialize(u16QueueDepth,
221  pImpl_->bFragmentationEnable_,
222  pImpl_->bAggregationEnable_,
223  true);
224 
225 
226 }
227 
229 {
231  DEBUG_LEVEL,
232  "MACI %03hu TDMA::BasicQueueManager::%s",
233  id_,
234  __func__);
235 }
236 
238 {
240  DEBUG_LEVEL,
241  "MACI %03hu TDMA::BasicQueueManager::%s",
242  id_,
243  __func__);
244 }
245 
247 {
249  DEBUG_LEVEL,
250  "MACI %03hu TDMA::BasicQueueManager::%s",
251  id_,
252  __func__);
253 }
254 
256 {
258  DEBUG_LEVEL,
259  "MACI %03hu TDMA::BasicQueueManager::%s",
260  id_,
261  __func__);
262 }
263 
264 size_t EMANE::Models::TDMA::BasicQueueManager::enqueue(std::uint8_t u8QueueIndex,
265  DownstreamPacket && pkt)
266 {
267  size_t packetsDropped{};
268 
269  if(u8QueueIndex < MAX_QUEUES)
270  {
271  auto ret = pImpl_->queues_[u8QueueIndex].enqueue(std::move(pkt));
272 
273  if(ret.second)
274  {
275  packetsDropped = 1;
276 
277  pImpl_->queueStatusPublisher_.drop(u8QueueIndex,
279  1);
280 
281  const auto & pktInfo = ret.first->getPacketInfo();
282 
283  pPacketStatusPublisher_->outbound(pktInfo.getSource(),
284  pktInfo.getDestination(),
285  pktInfo.getPriority(),
286  ret.first->length(),
288  }
289 
290  pImpl_->queueStatusPublisher_.enqueue(u8QueueIndex);
291  }
292 
293  return packetsDropped;
294 }
295 
296 std::tuple<EMANE::Models::TDMA::MessageComponents,size_t>
298  size_t requestedBytes,
299  NEMId destination)
300 {
301  MessageComponents components{};
302  size_t totalLength{};
303 
304  if(u8QueueIndex < MAX_QUEUES)
305  {
306  auto ret = pImpl_->queues_[u8QueueIndex].dequeue(requestedBytes,
307  destination,
308  true);
309 
310  auto length = std::get<1>(ret);
311 
312  if(length)
313  {
314  totalLength += length;
315 
316  auto & parts = std::get<0>(ret);
317 
318  pImpl_->queueStatusPublisher_.dequeue(u8QueueIndex,
319  u8QueueIndex,
320  parts);
321 
322  components.splice(components.end(),parts);
323  }
324 
325  // update drop info
326  for(const auto & pPkt : std::get<2>(ret))
327  {
328  const auto & pktInfo = pPkt->getPacketInfo();
329 
330  pPacketStatusPublisher_->outbound(pktInfo.getSource(),
331  pktInfo.getDestination(),
332  pktInfo.getPriority(),
333  pPkt->length(),
335 
336  pImpl_->queueStatusPublisher_.drop(u8QueueIndex,
338  1);
339  }
340 
341  size_t aggregationThreshold{static_cast<size_t>(requestedBytes *
342  pImpl_->dAggregationSlotThreshold_ / 100.0)};
343 
344 
345  // if allowed, check the other queues to see if they have components that will
346  // fit in the slot
347  if(!pImpl_->bStrictDequeueEnable_)
348  {
349  std::uint8_t i{MAX_QUEUES};
350 
351  while((!totalLength || (totalLength && pImpl_->bAggregationEnable_)) &&
352  totalLength <= aggregationThreshold &&
353  i > 0)
354  {
355  // check all queues except the original from highest priority to lowest
356  if(i-1 != u8QueueIndex)
357  {
358  auto ret = pImpl_->queues_[i-1].dequeue(requestedBytes - totalLength,
359  destination,
360  false);
361 
362  auto length = std::get<1>(ret);
363 
364  // if component data was dequeued, record it
365  if(length)
366  {
367  auto & parts = std::get<0>(ret);
368 
369  totalLength += length;
370 
371  pImpl_->queueStatusPublisher_.dequeue(u8QueueIndex,
372  i-1,
373  parts);
374 
375  components.splice(components.end(),parts);
376  }
377  }
378 
379  --i;
380  }
381  }
382  }
383 
385  components,
387 
388  return std::make_tuple(std::move(components),totalLength);
389 }
390 
393 {
394  QueueInfos queueInfos{};
395 
396  for(int i = 0; i < MAX_QUEUES; ++i)
397  {
398  auto status = pImpl_->queues_[i].getStatus();
399 
400  queueInfos.push_back({static_cast<std::uint8_t>(i), //queue id
401  std::get<0>(status), // packets
402  std::get<1>(status)}); //bytes
403  }
404 
405  return queueInfos;
406 }
The Registrar interface provides access to all of the emulator registrars.
Definition: registrar.h:50
virtual void outbound(NEMId src, NEMId dst, Priority priority, size_t size, OutboundAction action)=0
virtual ConfigurationRegistrar & configurationRegistrar()=0
virtual StatisticRegistrar & statisticRegistrar()=0
void configure(const ConfigurationUpdate &update) override
The PlatformServiceProvider interface provides access to emulator services.
QueueInfos getPacketQueueInfo() const override
size_t enqueue(std::uint8_t u8QueueIndex, DownstreamPacket &&pkt) override
std::vector< QueueInfo > QueueInfos
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::list< MessageComponent > MessageComponents
std::uint16_t NEMId
Definition: types.h:52
PlatformServiceProvider * pPlatformService_
virtual LogServiceProvider & logService()=0
Queue management interface used by BaseModel
Definition: queuemanager.h:57
void initialize(Registrar &registrar) override
std::vector< ConfigurationNameAnyValues > ConfigurationUpdate
void registerNumeric(const std::string &sName, const ConfigurationProperties &properties=ConfigurationProperties::NONE, const std::initializer_list< T > &values={}, const std::string &sUsage="", T minValue=std::numeric_limits< T >::lowest(), T maxValue=std::numeric_limits< T >::max(), std::size_t minOccurs=1, std::size_t maxOccurs=1, const std::string &sRegexPattern={})
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
BasicQueueManager(NEMId id, PlatformServiceProvider *pPlatformServiceProvider)
std::tuple< EMANE::Models::TDMA::MessageComponents, size_t > dequeue(std::uint8_t u8QueueIndex, size_t length, NEMId destination) override