EMANE  1.2.1
queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 - Adjacent Link LLC, Bridgewater, New Jersey
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in
13  * the documentation and/or other materials provided with the
14  * distribution.
15  * * Neither the name of Adjacent Link LLC nor the names of its
16  * contributors may be used to endorse or promote products derived
17  * from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include "queue.h"
34 
36  u16QueueDepth_{},
37  bFragment_{},
38  u64Counter_{},
39  currentBytes_{},
40  bIsControl_{}{}
41 
42 
43 void EMANE::Models::TDMA::Queue::initialize(std::uint16_t u16QueueDepth,
44  bool bFragment,
45  bool bAggregate,
46  bool bIsControl)
47 {
48  u16QueueDepth_ = u16QueueDepth;
49  bFragment_ = bFragment;
50  bIsControl_ = bIsControl;
51  bAggregate_ = bAggregate;
52 }
53 
54 std::pair<std::unique_ptr<EMANE::DownstreamPacket>,bool>
56 {
57  std::unique_ptr<DownstreamPacket> pDroppedPacket{};
58  bool bDroppedPacket{};
59 
60  DownstreamPacket * pPkt{new DownstreamPacket{std::move(pkt)}};
61 
62  NEMId dest{pPkt->getPacketInfo().getDestination()};
63 
64  if(queue_.size() == u16QueueDepth_)
65  {
66  // first candidate for overflow discard, oldest packet
67  auto entry = queue_.begin();
68 
69  // search the queue for a packet that is not in process of
70  // fragmentation, if all packets are undergoing fragmentation
71  // the oldest packet will be discarded
72  for(auto iter = queue_.begin();
73  iter != queue_.end();
74  ++iter)
75  {
76  // a packet undergoing fragmentation will have a non-zero
77  // fragment index
78  if(!iter->second.second->index_)
79  {
80  entry = iter;
81  break;
82  }
83  }
84 
85  // ownership transfer
86  pDroppedPacket.reset(entry->second.first);
87 
88  // ownership transfer
89  std::unique_ptr<MetaInfo> pDroppedMetaInfo{entry->second.second};
90 
91  bDroppedPacket = true;
92 
93  auto dst = pDroppedPacket->getPacketInfo().getDestination();
94 
95  destQueue_[dst].erase(entry->first);
96 
97  queue_.erase(entry->first);
98 
99  // update current bytes in queue by subtracting off amount
100  // remaining in dropped packet
101  currentBytes_ -= pDroppedPacket->length() - pDroppedMetaInfo->offset_;
102  }
103 
104  MetaInfo * pMetaInfo = new MetaInfo;
105 
106  queue_.insert(std::make_pair(u64Counter_,std::make_pair(pPkt,pMetaInfo)));
107 
108  auto iter = destQueue_.find(dest);
109 
110  if(iter == destQueue_.end())
111  {
112  iter = destQueue_.insert({dest,PacketQueue{}}).first;
113  }
114 
115  currentBytes_ += pPkt->length();
116 
117  iter->second.insert(std::make_pair(u64Counter_,std::make_pair(pPkt,pMetaInfo)));
118 
119  ++u64Counter_;
120 
121  return {std::move(pDroppedPacket),bDroppedPacket};
122 }
123 
125  size_t,
126  std::list<std::unique_ptr<EMANE::DownstreamPacket>>>
127  EMANE::Models::TDMA::Queue::dequeue(size_t requestedBytes, NEMId destination,bool bDrop)
128 {
129  MessageComponents components{};
130  size_t totalBytes{};
131  std::list<std::unique_ptr<DownstreamPacket>> dropped;
132 
133  while(totalBytes <= requestedBytes)
134  {
135  if(destination)
136  {
137  auto iter = destQueue_.find(destination);
138 
139  // check for a destination queue that is not empty
140  if(iter != destQueue_.end() && !iter->second.empty())
141  {
142  auto const & entry = iter->second.begin();
143 
144  auto & pPacket = std::get<0>(entry->second);
145  auto & pMetaInfo = std::get<1>(entry->second);
146 
147  if(pPacket->length() - pMetaInfo->offset_ <= requestedBytes - totalBytes)
148  {
149  if(pMetaInfo->offset_)
150  {
151  auto ret = fragmentPacket(pPacket,
152  pMetaInfo,
153  entry->first,
154  requestedBytes - totalBytes);
155 
156  totalBytes += ret.second;
157 
158  components.push_back(std::move(ret.first));
159  }
160  else
161  {
162  components.push_back({bIsControl_ ?
165  pPacket->getPacketInfo().getDestination(),
166  pPacket->getPacketInfo().getPriority(),
167  pPacket->getVectorIO(),
168  pMetaInfo->index_,
169  pMetaInfo->offset_,
170  entry->first,
171  false});
172 
173 
174  totalBytes += pPacket->length() - pMetaInfo->offset_;
175  }
176 
177  delete pPacket;
178  delete pMetaInfo;
179 
180  // remove from packet queue using queue sequence
181  queue_.erase(entry->first);
182 
183  // remove from destination queue using iterator
184  iter->second.erase(entry);
185 
186  // if aggregation is disabled don't look further
187  if(!bAggregate_)
188  {
189  break;
190  }
191  }
192  else
193  {
194  if(bFragment_)
195  {
196  auto ret = fragmentPacket(pPacket,
197  pMetaInfo,
198  entry->first,
199  requestedBytes - totalBytes);
200 
201  totalBytes += ret.second;
202 
203  components.push_back(std::move(ret.first));
204 
205  break;
206  }
207  else
208  {
209  if(bDrop && components.empty())
210  {
211  // drop packet - too large and fragmentation
212  // is disabled
213  currentBytes_ -= pPacket->length();
214 
215  // transfer ownership to std::unique_ptr
216  dropped.push_back(std::unique_ptr<DownstreamPacket>{pPacket});
217 
218  delete pMetaInfo;
219 
220  // remove from packet queue using queue sequence
221  queue_.erase(entry->first);
222 
223  // remove from destination queue using iterator
224  iter->second.erase(entry);
225  }
226  else
227  {
228  break;
229  }
230  }
231  }
232  }
233  else
234  {
235  break;
236  }
237  }
238  else if(!queue_.empty())
239  {
240  auto const & entry = queue_.begin();
241 
242  auto & pPacket = std::get<0>(entry->second);
243  auto & pMetaInfo = std::get<1>(entry->second);
244 
245  NEMId dst{pPacket->getPacketInfo().getDestination()};
246 
247  if(pPacket->length() - pMetaInfo->offset_ <= requestedBytes - totalBytes)
248  {
249  if(pMetaInfo->offset_)
250  {
251  auto ret = fragmentPacket(pPacket,
252  pMetaInfo,
253  entry->first,
254  requestedBytes - totalBytes);
255 
256  totalBytes += ret.second;
257 
258  components.push_back(std::move(ret.first));
259  }
260  else
261  {
262  components.push_back({bIsControl_ ?
265  dst,
266  pPacket->getPacketInfo().getPriority(),
267  pPacket->getVectorIO(),
268  pMetaInfo->index_,
269  pMetaInfo->offset_,
270  entry->first,
271  false});
272 
273  totalBytes += pPacket->length() - pMetaInfo->offset_;
274  }
275 
276  delete pPacket;
277  delete pMetaInfo;
278 
279  // remove from destination queue using queue sequence
280  destQueue_[dst].erase(entry->first);
281 
282  // remove for packet queue using iterator
283  queue_.erase(entry);
284 
285  // if aggregation is disabled don't look further
286  if(!bAggregate_)
287  {
288  break;
289  }
290  }
291  else
292  {
293  if(bFragment_)
294  {
295  auto ret = fragmentPacket(pPacket,
296  pMetaInfo,
297  entry->first,
298  requestedBytes - totalBytes);
299 
300  totalBytes += ret.second;
301 
302  components.push_back(std::move(ret.first));
303 
304  break;
305  }
306  else
307  {
308  if(bDrop && components.empty())
309  {
310  // drop packet - too large and fragmentation
311  // is disabled
312  currentBytes_ -= pPacket->length();
313 
314  // transfer ownership to std::unique_ptr
315  dropped.push_back(std::unique_ptr<DownstreamPacket>{pPacket});
316 
317  delete pMetaInfo;
318 
319  // remove from destination queue using queue sequence
320  destQueue_[dst].erase(entry->first);
321 
322  // remove for packet queue using iterator
323  queue_.erase(entry);
324  }
325  else
326  {
327  break;
328  }
329  }
330  }
331  }
332  else
333  {
334  break;
335  }
336  }
337 
338  // reduce bytes in queue by the total being returned, dropped bytes
339  // already accounted for
340  currentBytes_ -= totalBytes;
341 
342  return std::make_tuple(components,totalBytes,std::move(dropped));
343 }
344 
345 std::pair<EMANE::Models::TDMA::MessageComponent,size_t>
346 EMANE::Models::TDMA::Queue::fragmentPacket(DownstreamPacket * pPacket,
347  MetaInfo * pMetaInfo,
348  std::uint64_t u64FragmentSequence,
349  size_t bytes)
350 {
351  size_t totalBytesVisited{};
352  size_t totalBytesCopied{};
353  Utils::VectorIO vectorIOs{};
354 
355  // packet data is stored in an iovec, need to determine
356  // which iovec to start with and proceed to advance when necessary
357  for(const auto & entry : pPacket->getVectorIO())
358  {
359  if(totalBytesCopied < bytes)
360  {
361  if(totalBytesVisited + entry.iov_len < pMetaInfo->offset_)
362  {
363  totalBytesVisited += entry.iov_len;
364  }
365  else
366  {
367  char * pBuf{reinterpret_cast<char *>(entry.iov_base)};
368 
369  // where are we in the current entry
370  auto offset = pMetaInfo->offset_ - totalBytesVisited;
371 
372  // how much of this entry is left
373  auto remainder = entry.iov_len - offset;
374 
375  // if necessary adjust totalBytesVisited after
376  // calculating where in the current entry the fragment
377  // begins
378  if(totalBytesVisited < pMetaInfo->offset_)
379  {
380  totalBytesVisited = pMetaInfo->offset_;
381  }
382 
383  // clamp the reaminder if there is more remaining than
384  // what was request
385  size_t amountToCopy{};
386 
387  if(remainder > bytes - totalBytesCopied)
388  {
389  amountToCopy = bytes - totalBytesCopied;
390  remainder -= amountToCopy;
391  }
392  else
393  {
394  amountToCopy = remainder;
395  }
396 
397  vectorIOs.push_back(Utils::make_iovec(pBuf+offset,amountToCopy));
398 
399  pMetaInfo->offset_ += amountToCopy;
400  totalBytesVisited += amountToCopy;
401  totalBytesCopied += amountToCopy;
402  }
403  }
404  else
405  {
406  break;
407  }
408  }
409 
410 
411  MessageComponent component{bIsControl_ ?
414  pPacket->getPacketInfo().getDestination(),
415  pPacket->getPacketInfo().getPriority(),
416  vectorIOs,
417  pMetaInfo->index_,
418  pMetaInfo->offset_ - totalBytesCopied,
419  u64FragmentSequence,
420  totalBytesVisited != pPacket->length()};
421 
422  ++pMetaInfo->index_;
423 
424  return {component,totalBytesCopied};
425 }
426 
427 // packets, bytes
428 std::tuple<size_t,size_t> EMANE::Models::TDMA::Queue::getStatus() const
429 {
430  return std::make_tuple(queue_.size(),currentBytes_);
431 }
void initialize(std::uint16_t u16QueueDepth, bool bFragment, bool bAggregate, bool bIsControl)
Definition: queue.cc:43
const PacketInfo & getPacketInfo() const
std::pair< std::unique_ptr< DownstreamPacket >, bool > enqueue(DownstreamPacket &&pkt)
Definition: queue.cc:55
NEMId getDestination() const
Definition: packetinfo.inl:70
union EtherAddr dst
Definition: netutils.h:390
iovec make_iovec(void *base, std::size_t len)
Definition: vectorio.h:46
std::vector< iovec > VectorIO
Definition: vectorio.h:43
struct EtherAddrBytes bytes
Definition: netutils.h:390
std::tuple< size_t, size_t > getStatus() const
Definition: queue.cc:428
Specialized packet the allows downstream processing to add layer specific headers as the packet trave...
std::list< MessageComponent > MessageComponents
std::uint16_t NEMId
Definition: types.h:52
Priority getPriority() const
Definition: packetinfo.inl:76
std::tuple< MessageComponents, size_t, std::list< std::unique_ptr< DownstreamPacket > > > dequeue(size_t requestedBytes, NEMId destination, bool bDrop)
Definition: queue.cc:127
Utils::VectorIO getVectorIO() const
Holds a message component that may be all or part of a data or control message.