48 u16QueueDepth_ = u16QueueDepth;
49 bFragment_ = bFragment;
50 bIsControl_ = bIsControl;
51 bAggregate_ = bAggregate;
54 std::pair<std::unique_ptr<EMANE::DownstreamPacket>,
bool>
57 std::unique_ptr<DownstreamPacket> pDroppedPacket{};
58 bool bDroppedPacket{};
64 if(queue_.size() == u16QueueDepth_)
67 auto entry = queue_.begin();
72 for(
auto iter = queue_.begin();
78 if(!iter->second.second->index_)
86 pDroppedPacket.reset(entry->second.first);
89 std::unique_ptr<MetaInfo> pDroppedMetaInfo{entry->second.second};
91 bDroppedPacket =
true;
93 auto dst = pDroppedPacket->getPacketInfo().getDestination();
95 destQueue_[
dst].erase(entry->first);
97 queue_.erase(entry->first);
101 currentBytes_ -= pDroppedPacket->length() - pDroppedMetaInfo->offset_;
104 MetaInfo * pMetaInfo =
new MetaInfo;
106 queue_.insert(std::make_pair(u64Counter_,std::make_pair(pPkt,pMetaInfo)));
108 auto iter = destQueue_.find(dest);
110 if(iter == destQueue_.end())
112 iter = destQueue_.insert({dest,PacketQueue{}}).first;
115 currentBytes_ += pPkt->length();
117 iter->second.insert(std::make_pair(u64Counter_,std::make_pair(pPkt,pMetaInfo)));
121 return {std::move(pDroppedPacket),bDroppedPacket};
126 std::list<std::unique_ptr<EMANE::DownstreamPacket>>>
131 std::list<std::unique_ptr<DownstreamPacket>> dropped;
133 while(totalBytes <= requestedBytes)
137 auto iter = destQueue_.find(destination);
140 if(iter != destQueue_.end() && !iter->second.empty())
142 auto const & entry = iter->second.begin();
144 auto & pPacket = std::get<0>(entry->second);
145 auto & pMetaInfo = std::get<1>(entry->second);
147 if(pPacket->length() - pMetaInfo->offset_ <= requestedBytes - totalBytes)
149 if(pMetaInfo->offset_)
151 auto ret = fragmentPacket(pPacket,
154 requestedBytes - totalBytes);
156 totalBytes += ret.second;
158 components.push_back(std::move(ret.first));
162 components.push_back({bIsControl_ ?
165 pPacket->getPacketInfo().getDestination(),
166 pPacket->getPacketInfo().getPriority(),
167 pPacket->getVectorIO(),
174 totalBytes += pPacket->length() - pMetaInfo->offset_;
181 queue_.erase(entry->first);
184 iter->second.erase(entry);
196 auto ret = fragmentPacket(pPacket,
199 requestedBytes - totalBytes);
201 totalBytes += ret.second;
203 components.push_back(std::move(ret.first));
209 if(bDrop && components.empty())
213 currentBytes_ -= pPacket->length();
216 dropped.push_back(std::unique_ptr<DownstreamPacket>{pPacket});
221 queue_.erase(entry->first);
224 iter->second.erase(entry);
238 else if(!queue_.empty())
240 auto const & entry = queue_.begin();
242 auto & pPacket = std::get<0>(entry->second);
243 auto & pMetaInfo = std::get<1>(entry->second);
245 NEMId dst{pPacket->getPacketInfo().getDestination()};
247 if(pPacket->length() - pMetaInfo->offset_ <= requestedBytes - totalBytes)
249 if(pMetaInfo->offset_)
251 auto ret = fragmentPacket(pPacket,
254 requestedBytes - totalBytes);
256 totalBytes += ret.second;
258 components.push_back(std::move(ret.first));
262 components.push_back({bIsControl_ ?
266 pPacket->getPacketInfo().getPriority(),
267 pPacket->getVectorIO(),
273 totalBytes += pPacket->length() - pMetaInfo->offset_;
280 destQueue_[
dst].erase(entry->first);
295 auto ret = fragmentPacket(pPacket,
298 requestedBytes - totalBytes);
300 totalBytes += ret.second;
302 components.push_back(std::move(ret.first));
308 if(bDrop && components.empty())
312 currentBytes_ -= pPacket->length();
315 dropped.push_back(std::unique_ptr<DownstreamPacket>{pPacket});
320 destQueue_[
dst].erase(entry->first);
340 currentBytes_ -= totalBytes;
342 return std::make_tuple(components,totalBytes,std::move(dropped));
345 std::pair<EMANE::Models::TDMA::MessageComponent,size_t>
347 MetaInfo * pMetaInfo,
348 std::uint64_t u64FragmentSequence,
351 size_t totalBytesVisited{};
352 size_t totalBytesCopied{};
359 if(totalBytesCopied < bytes)
361 if(totalBytesVisited + entry.iov_len < pMetaInfo->offset_)
363 totalBytesVisited += entry.iov_len;
367 char * pBuf{
reinterpret_cast<char *
>(entry.iov_base)};
370 auto offset = pMetaInfo->offset_ - totalBytesVisited;
373 auto remainder = entry.iov_len - offset;
378 if(totalBytesVisited < pMetaInfo->offset_)
380 totalBytesVisited = pMetaInfo->offset_;
385 size_t amountToCopy{};
387 if(remainder > bytes - totalBytesCopied)
389 amountToCopy = bytes - totalBytesCopied;
390 remainder -= amountToCopy;
394 amountToCopy = remainder;
399 pMetaInfo->offset_ += amountToCopy;
400 totalBytesVisited += amountToCopy;
401 totalBytesCopied += amountToCopy;
418 pMetaInfo->offset_ - totalBytesCopied,
420 totalBytesVisited != pPacket->
length()};
424 return {component,totalBytesCopied};
430 return std::make_tuple(queue_.size(),currentBytes_);
void initialize(std::uint16_t u16QueueDepth, bool bFragment, bool bAggregate, bool bIsControl)
const PacketInfo & getPacketInfo() const
std::pair< std::unique_ptr< DownstreamPacket >, bool > enqueue(DownstreamPacket &&pkt)
NEMId getDestination() const
iovec make_iovec(void *base, std::size_t len)
std::vector< iovec > VectorIO
struct EtherAddrBytes bytes
std::tuple< size_t, size_t > getStatus() const
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::list< MessageComponent > MessageComponents
Priority getPriority() const
std::tuple< MessageComponents, size_t, std::list< std::unique_ptr< DownstreamPacket > > > dequeue(size_t requestedBytes, NEMId destination, bool bDrop)
Utils::VectorIO getVectorIO() const
Holds a message component that may be all or part of a data or control message.