40 const int MAX_QUEUES = 5;
43 class EMANE::Models::TDMA::BasicQueueManager::Implementation
46 bool bAggregationEnable_{};
47 bool bFragmentationEnable_{};
48 bool bStrictDequeueEnable_{};
49 double dAggregationSlotThreshold_{};
50 Queue queues_[MAX_QUEUES];
51 QueueStatusPublisher queueStatusPublisher_;
58 pImpl_{
new Implementation{}}{}
66 "MACI %03hu TDMA::BasicQueueManager::%s",
75 "Defines the size of the per service class downstream packet" 76 " queues (in packets). Each of the 5 queues (control + 4" 77 " service classes) will be 'queuedepth' size.");
79 configRegistrar.registerNumeric<
bool>(
"queue.aggregationenable",
82 "Defines whether packet aggregation is enabled for transmission. When" 83 " enabled, multiple packets can be sent in the same transmission when" 84 " there is additional room within the slot.");
87 configRegistrar.registerNumeric<
bool>(
"queue.fragmentationenable",
90 "Defines whether packet fragmentation is enabled. When enabled, a single" 91 " packet will be fragmented into multiple message components to be sent" 92 " over multiple transmissions when the slot is too small. When disabled" 93 " and the packet matches the traffic class for the transmit slot as" 94 " defined in the TDMA schedule, the packet will be discarded.");
97 configRegistrar.registerNumeric<
bool>(
"queue.strictdequeueenable",
100 "Defines whether packets will be dequeued from a queue other than what" 101 " has been specified when there are no eligible packets for dequeue in" 102 " the specified queue. Queues are dequeued highest priority first.");
104 configRegistrar.registerNumeric<
double>(
"queue.aggregationslotthreshold",
107 "Defines the percentage of a slot that must be filled in order to conclude" 108 " aggregation when queue.aggregationenable is enabled.",
114 pImpl_->queueStatusPublisher_.registerStatistics(statisticRegistrar);
122 "MACI %03hu TDMA::BasicQueueManager::%s",
126 std::uint16_t u16QueueDepth{};
128 for(
const auto & item : update)
130 if(item.first ==
"queue.depth")
132 u16QueueDepth = item.second[0].asUINT16();
136 "MACI %03hu TDMA::BasicQueueManager::%s: %s = %hu",
142 else if(item.first ==
"queue.aggregationenable")
144 pImpl_->bAggregationEnable_ = item.second[0].asBool();
148 "MACI %03hu TDMA::BaseModel::%s: %s = %s",
152 pImpl_->bAggregationEnable_ ?
"on" :
"off");
154 else if(item.first ==
"queue.fragmentationenable")
156 pImpl_->bFragmentationEnable_ = item.second[0].asBool();
160 "MACI %03hu TDMA::BaseModel::%s: %s = %s",
164 pImpl_->bFragmentationEnable_ ?
"on" :
"off");
166 else if(item.first ==
"queue.strictdequeueenable")
168 pImpl_->bStrictDequeueEnable_ = item.second[0].asBool();
172 "MACI %03hu TDMA::BaseModel::%s: %s = %s",
176 pImpl_->bStrictDequeueEnable_ ?
"on" :
"off");
178 else if(item.first ==
"queue.aggregationslotthreshold")
180 pImpl_->dAggregationSlotThreshold_ = item.second[0].asDouble();
184 "MACI %03hu TDMA::BaseModel::%s: %s = %lf",
188 pImpl_->dAggregationSlotThreshold_);
193 throw makeException<ConfigureException>(
"TDMA::BasicQueueManager: " 194 "Unexpected configuration item %s",
200 pImpl_->queues_[0].initialize(u16QueueDepth,
201 pImpl_->bFragmentationEnable_,
202 pImpl_->bAggregationEnable_,
205 pImpl_->queues_[1].initialize(u16QueueDepth,
206 pImpl_->bFragmentationEnable_,
207 pImpl_->bAggregationEnable_,
210 pImpl_->queues_[2].initialize(u16QueueDepth,
211 pImpl_->bFragmentationEnable_,
212 pImpl_->bAggregationEnable_,
214 pImpl_->queues_[3].initialize(u16QueueDepth,
215 pImpl_->bFragmentationEnable_,
216 pImpl_->bAggregationEnable_,
220 pImpl_->queues_[4].initialize(u16QueueDepth,
221 pImpl_->bFragmentationEnable_,
222 pImpl_->bAggregationEnable_,
232 "MACI %03hu TDMA::BasicQueueManager::%s",
241 "MACI %03hu TDMA::BasicQueueManager::%s",
250 "MACI %03hu TDMA::BasicQueueManager::%s",
259 "MACI %03hu TDMA::BasicQueueManager::%s",
267 size_t packetsDropped{};
269 if(u8QueueIndex < MAX_QUEUES)
271 auto ret = pImpl_->queues_[u8QueueIndex].enqueue(std::move(pkt));
277 pImpl_->queueStatusPublisher_.drop(u8QueueIndex,
281 const auto & pktInfo = ret.first->getPacketInfo();
284 pktInfo.getDestination(),
285 pktInfo.getPriority(),
290 pImpl_->queueStatusPublisher_.enqueue(u8QueueIndex);
293 return packetsDropped;
296 std::tuple<EMANE::Models::TDMA::MessageComponents,size_t>
298 size_t requestedBytes,
302 size_t totalLength{};
304 if(u8QueueIndex < MAX_QUEUES)
306 auto ret = pImpl_->queues_[u8QueueIndex].dequeue(requestedBytes,
310 auto length = std::get<1>(ret);
314 totalLength += length;
316 auto & parts = std::get<0>(ret);
318 pImpl_->queueStatusPublisher_.dequeue(u8QueueIndex,
322 components.splice(components.end(),parts);
326 for(
const auto & pPkt : std::get<2>(ret))
328 const auto & pktInfo = pPkt->getPacketInfo();
331 pktInfo.getDestination(),
332 pktInfo.getPriority(),
336 pImpl_->queueStatusPublisher_.drop(u8QueueIndex,
341 size_t aggregationThreshold{
static_cast<size_t>(requestedBytes *
342 pImpl_->dAggregationSlotThreshold_ / 100.0)};
347 if(!pImpl_->bStrictDequeueEnable_)
349 std::uint8_t i{MAX_QUEUES};
351 while((!totalLength || (totalLength && pImpl_->bAggregationEnable_)) &&
352 totalLength <= aggregationThreshold &&
356 if(i-1 != u8QueueIndex)
358 auto ret = pImpl_->queues_[i-1].dequeue(requestedBytes - totalLength,
362 auto length = std::get<1>(ret);
367 auto & parts = std::get<0>(ret);
369 totalLength += length;
371 pImpl_->queueStatusPublisher_.dequeue(u8QueueIndex,
375 components.splice(components.end(),parts);
388 return std::make_tuple(std::move(components),totalLength);
396 for(
int i = 0; i < MAX_QUEUES; ++i)
398 auto status = pImpl_->queues_[i].getStatus();
400 queueInfos.push_back({
static_cast<std::uint8_t
>(i),
402 std::get<1>(status)});
The Registrar interface provides access to all of the emulator registrars.
virtual void outbound(NEMId src, NEMId dst, Priority priority, size_t size, OutboundAction action)=0
virtual ConfigurationRegistrar & configurationRegistrar()=0
virtual StatisticRegistrar & statisticRegistrar()=0
void configure(const ConfigurationUpdate &update) override
QueueInfos getPacketQueueInfo() const override
size_t enqueue(std::uint8_t u8QueueIndex, DownstreamPacket &&pkt) override
std::vector< QueueInfo > QueueInfos
void postStart() override
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::list< MessageComponent > MessageComponents
Queue management interface used by BaseModel
void initialize(Registrar ®istrar) override
std::vector< ConfigurationNameAnyValues > ConfigurationUpdate
void registerNumeric(const std::string &sName, const ConfigurationProperties &properties=ConfigurationProperties::NONE, const std::initializer_list< T > &values={}, const std::string &sUsage="", T minValue=std::numeric_limits< T >::lowest(), T maxValue=std::numeric_limits< T >::max(), std::size_t minOccurs=1, std::size_t maxOccurs=1, const std::string &sRegexPattern={})
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
BasicQueueManager(NEMId id, PlatformServiceProvider *pPlatformServiceProvider)
PacketStatusPublisher * pPacketStatusPublisher_
std::tuple< EMANE::Models::TDMA::MessageComponents, size_t > dequeue(std::uint8_t u8QueueIndex, size_t length, NEMId destination) override