EMANE  1.0.1
receivemanager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 - Adjacent Link LLC, Bridgewater, New Jersey
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in
13  * the documentation and/or other materials provided with the
14  * distribution.
15  * * Neither the name of Adjacent Link LLC nor the names of its
16  * contributors may be used to endorse or promote products derived
17  * from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include "receivemanager.h"
35 
37  DownstreamTransport * pDownstreamTransport,
38  LogServiceProvider * pLogService,
39  RadioServiceProvider * pRadioService,
40  Scheduler * pScheduler,
41  PacketStatusPublisher * pPacketStatusPublisher,
42  NeighborMetricManager * pNeighborMetricManager):
43  id_{id},
44  pDownstreamTransport_{pDownstreamTransport},
45  pLogService_{pLogService},
46  pRadioService_{pRadioService},
47  pScheduler_{pScheduler},
48  pPacketStatusPublisher_{pPacketStatusPublisher},
49  pNeighborMetricManager_{pNeighborMetricManager},
50  pendingInfo_{{},{{},{},{},{}},{},{},{},{},{},{}},
51  u64PendingAbsoluteSlotIndex_{},
52  distribution_{0.0, 1.0},
53  bPromiscuousMode_{},
54  fragmentCheckThreshold_{2},
55  fragmentTimeoutThreshold_{5}{}
56 
58 {
59  bPromiscuousMode_ = bEnable;
60 }
61 
62 void EMANE::Models::TDMA::ReceiveManager::loadCurves(const std::string & sPCRFileName)
63 {
64  porManager_.load(sPCRFileName);
65 }
66 
67 void EMANE::Models::TDMA::ReceiveManager::setFragmentCheckThreshold(const std::chrono::seconds & threshold)
68 {
69  fragmentCheckThreshold_ = threshold;
70 }
71 
72 void EMANE::Models::TDMA::ReceiveManager::setFragmentTimeoutThreshold(const std::chrono::seconds & threshold)
73 {
74  fragmentTimeoutThreshold_ = threshold;
75 }
76 
77 
78 bool
80  const PacketInfo & pktInfo,
81  size_t length,
82  const TimePoint & startOfReception,
83  const FrequencySegments & frequencySegments,
84  const Microseconds & span,
85  const TimePoint & beginTime,
86  std::uint64_t u64PacketSequence)
87 {
88  bool bReturn{};
89  std::uint64_t u64AbsoluteSlotIndex{baseModelMessage.getAbsoluteSlotIndex()};
90 
91  if(!u64PendingAbsoluteSlotIndex_)
92  {
93  u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
94 
95  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
96  pktInfo,
97  length,
98  startOfReception,
99  frequencySegments,
100  span,
101  beginTime,
102  u64PacketSequence);
103  bReturn = true;
104  }
105  else if(u64PendingAbsoluteSlotIndex_ < u64AbsoluteSlotIndex)
106  {
107  process(u64AbsoluteSlotIndex);
108 
109  u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
110 
111  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
112  pktInfo,
113  length,
114  startOfReception,
115  frequencySegments,
116  span,
117  beginTime,
118  u64PacketSequence);
119  bReturn = true;
120  }
121  else if(u64PendingAbsoluteSlotIndex_ > u64AbsoluteSlotIndex)
122  {
123  LOGGER_VERBOSE_LOGGING(*pLogService_,
124  ERROR_LEVEL,
125  "MACI %03hu TDMA::ReceiveManager enqueue: pending slot: %zu greater than enqueue: %zu",
126  id_,
127  u64PendingAbsoluteSlotIndex_,
128  u64AbsoluteSlotIndex);
129 
130  u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
131 
132  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
133  pktInfo,
134  length,
135  startOfReception,
136  frequencySegments,
137  span,
138  beginTime,
139  u64PacketSequence);
140  bReturn = true;
141 
142  }
143  else
144  {
145  if(std::get<3>(pendingInfo_) < startOfReception)
146  {
147  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
148  pktInfo,
149  length,
150  startOfReception,
151  frequencySegments,
152  span,
153  beginTime,
154  u64PacketSequence);
155  }
156  }
157 
158  return bReturn;
159 }
160 
161 void
162 EMANE::Models::TDMA::ReceiveManager::process(std::uint64_t u64AbsoluteSlotIndex)
163 {
164  auto now = Clock::now();
165 
166  if(u64PendingAbsoluteSlotIndex_ + 1 == u64AbsoluteSlotIndex)
167  {
168  u64PendingAbsoluteSlotIndex_ = 0;
169 
170  double dSINR{};
171  double dNoiseFloordB{};
172 
173  BaseModelMessage & baseModelMessage = std::get<0>(pendingInfo_);
174  PacketInfo pktInfo{std::get<1>(pendingInfo_)};
175  size_t length{std::get<2>(pendingInfo_)};
176  TimePoint & startOfReception = std::get<3>(pendingInfo_);
177  FrequencySegments & frequencySegments = std::get<4>(pendingInfo_);
178  Microseconds & span = std::get<5>(pendingInfo_);
179  std::uint64_t u64SequenceNumber{std::get<7>(pendingInfo_)};
180 
181  auto & frequencySegment = *frequencySegments.begin();
182 
183  try
184  {
185  auto window = pRadioService_->spectrumService().request(frequencySegment.getFrequencyHz(),
186  span,
187  startOfReception);
188 
189 
190  bool bSignalInNoise{};
191 
192  std::tie(dNoiseFloordB,bSignalInNoise) =
193  Utils::maxBinNoiseFloor(window,frequencySegment.getRxPowerdBm());
194 
195  dSINR = frequencySegment.getRxPowerdBm() - dNoiseFloordB;
196 
197  LOGGER_VERBOSE_LOGGING(*pLogService_,
198  DEBUG_LEVEL,
199  "MACI %03hu TDMA::ReceiveManager upstream EOR processing:"
200  " src %hu, dst %hu, max noise %lf, signal in noise %s, SINR %lf",
201  id_,
202  pktInfo.getSource(),
203  pktInfo.getDestination(),
204  dNoiseFloordB,
205  bSignalInNoise ? "yes" : "no",
206  dSINR);
207  }
208  catch(SpectrumServiceException & exp)
209  {
210  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
211  baseModelMessage.getMessages(),
213 
214 
215  LOGGER_VERBOSE_LOGGING(*pLogService_,
216  ERROR_LEVEL,
217  "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu,"
218  " dst %hu, sor %ju, span %ju spectrum service request error: %s",
219  id_,
220  pktInfo.getSource(),
221  pktInfo.getDestination(),
222  std::chrono::duration_cast<Microseconds>(startOfReception.time_since_epoch()).count(),
223  span.count(),
224  exp.what());
225 
226  return;
227  }
228 
229 
230  // check sinr
231  float fPOR = porManager_.getPOR(baseModelMessage.getDataRate(),dSINR,length);
232 
233  LOGGER_VERBOSE_LOGGING(*pLogService_,
234  DEBUG_LEVEL,
235  "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu,"
236  " dst %hu, datarate: %ju sinr: %lf length: %lu, por: %f",
237  id_,
238  pktInfo.getSource(),
239  pktInfo.getDestination(),
240  baseModelMessage.getDataRate(),
241  dSINR,
242  length,
243  fPOR);
244 
245  // get random value [0.0, 1.0]
246  float fRandom{distribution_()};
247 
248  if(fPOR < fRandom)
249  {
250  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
251  baseModelMessage.getMessages(),
253 
254  LOGGER_VERBOSE_LOGGING(*pLogService_,
255  DEBUG_LEVEL,
256  "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu, dst %hu, "
257  "rxpwr %3.2f dBm, drop",
258  id_,
259  pktInfo.getSource(),
260  pktInfo.getDestination(),
261  frequencySegment.getRxPowerdBm());
262 
263  return;
264  }
265 
266 
267  // update neighbor metrics
268  pNeighborMetricManager_->updateNeighborRxMetric(pktInfo.getSource(), // nbr (src)
269  u64SequenceNumber, // sequence number
270  pktInfo.getUUID(),
271  dSINR, // sinr in dBm
272  dNoiseFloordB, // noise floor in dB
273  startOfReception, // rx time
274  frequencySegment.getDuration(), // duration
275  baseModelMessage.getDataRate()); // data rate bps
276 
277  for(const auto & message : baseModelMessage.getMessages())
278  {
279  NEMId dst{message.getDestination()};
280  Priority priority{message.getPriority()};
281 
282  if(bPromiscuousMode_ ||
283  (dst == id_) ||
285  {
286  const auto & data = message.getData();
287 
288  if(message.isFragment())
289  {
290  LOGGER_VERBOSE_LOGGING(*pLogService_,
291  DEBUG_LEVEL,
292  "MACI %03hu TDMA::ReceiveManager upstream EOR processing:"
293  " src %hu, dst %hu, findex: %zu foffset: %zu fbytes: %zu"
294  " fmore: %s",
295  id_,
296  pktInfo.getSource(),
297  pktInfo.getDestination(),
298  message.getFragmentIndex(),
299  message.getFragmentOffset(),
300  data.size(),
301  message.isMoreFragments() ? "yes" : "no");
302 
303 
304  auto key = std::make_tuple(pktInfo.getSource(),
305  priority,
306  message.getFragmentSequence());
307 
308  auto iter = fragmentStore_.find(key);
309 
310  if(iter != fragmentStore_.end())
311  {
312  auto & indexSet = std::get<0>(iter->second);
313  auto & parts = std::get<1>(iter->second);
314  auto & lastFragmentTime = std::get<2>(iter->second);
315 
316  if(indexSet.insert(message.getFragmentIndex()).second)
317  {
318  parts.insert(std::make_pair(message.getFragmentOffset(),message.getData()));
319 
320  lastFragmentTime = now;
321 
322  // check that all previous fragments have been received
323  if(indexSet.size() == message.getFragmentIndex() + 1)
324  {
325  if(!message.isMoreFragments())
326  {
327  Utils::VectorIO vectorIO{};
328 
329  for(const auto & part : parts)
330  {
331  vectorIO.push_back(Utils::make_iovec(const_cast<std::uint8_t *>(&part.second[0]),
332  part.second.size()));
333  }
334 
335  UpstreamPacket pkt{{pktInfo.getSource(),
336  dst,
337  priority,
338  pktInfo.getCreationTime(),
339  pktInfo.getUUID()},vectorIO};
340 
341 
342  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
343  dst,
344  priority,
345  pkt.length(),
347 
348 
349  PacketMetaInfo packetMetaInfo{pktInfo.getSource(),
350  u64AbsoluteSlotIndex-1,
351  frequencySegment.getRxPowerdBm(),
352  dSINR,
353  baseModelMessage.getDataRate()};
354 
355  if(message.getType() == MessageComponent::Type::DATA)
356  {
357  pDownstreamTransport_->sendUpstreamPacket(pkt);
358 
359  pScheduler_->processPacketMetaInfo(packetMetaInfo);
360  }
361  else
362  {
363  pScheduler_->processSchedulerPacket(pkt,packetMetaInfo);
364  }
365 
366 
367  fragmentStore_.erase(iter);
368  }
369  }
370  else
371  {
372  // missing a fragment - record all bytes received and discontinue assembly
373  size_t totalBytes{message.getData().size()};
374 
375  for(const auto & part : parts)
376  {
377  totalBytes += part.second.size();
378  }
379 
380  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
381  dst,
382  priority,
383  totalBytes,
385 
386  // fragment was not received, abandon reassembly
387  fragmentStore_.erase(iter);
388  }
389  }
390  }
391  else
392  {
393  // if the first fragment receieved is not index 0, fragments
394  // were lost, so don't bother trying to reassemble
395  if(!message.getFragmentIndex())
396  {
397  fragmentStore_.insert(std::make_pair(key,
398  std::make_tuple(std::set<size_t>{message.getFragmentIndex()},
399  FragmentParts{{message.getFragmentOffset(),
400  message.getData()}},
401  now,
402  dst,
403  priority)));
404  }
405  else
406  {
407  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
408  message,
410 
411  }
412  }
413  }
414  else
415  {
416  LOGGER_VERBOSE_LOGGING(*pLogService_,
417  DEBUG_LEVEL,
418  "MACI %03hu TDMA::ReceiveManager upstream EOR processing:"
419  " src %hu, dst %hu, forward upstream",
420  id_,
421  pktInfo.getSource(),
422  pktInfo.getDestination());
423 
424 
425  auto data = message.getData();
426 
427  UpstreamPacket pkt{{pktInfo.getSource(),
428  dst,
429  priority,
430  pktInfo.getCreationTime(),
431  pktInfo.getUUID()},&data[0],data.size()};
432 
433 
434  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
435  message,
437 
438  PacketMetaInfo packetMetaInfo{pktInfo.getSource(),
439  u64AbsoluteSlotIndex-1,
440  frequencySegment.getRxPowerdBm(),
441  dSINR,
442  baseModelMessage.getDataRate()};
443 
444  if(message.getType() == MessageComponent::Type::DATA)
445  {
446  pDownstreamTransport_->sendUpstreamPacket(pkt);
447 
448  pScheduler_->processPacketMetaInfo(packetMetaInfo);
449  }
450  else
451  {
452  pScheduler_->processSchedulerPacket(pkt,packetMetaInfo);
453  }
454  }
455  }
456  else
457  {
458  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
459  message,
461  }
462  }
463  }
464 
465  // check to see if there are fragment assemblies to abandon
466  if(lastFragmentCheckTime_ + fragmentCheckThreshold_ <= now)
467  {
468  for(auto iter = fragmentStore_.begin(); iter != fragmentStore_.end();)
469  {
470  auto & parts = std::get<1>(iter->second);
471  auto & lastFragmentTime = std::get<2>(iter->second);
472  auto & dst = std::get<3>(iter->second);
473  auto & priority = std::get<4>(iter->second);
474 
475  if(lastFragmentTime + fragmentTimeoutThreshold_ <= now)
476  {
477  size_t totalBytes{};
478 
479  for(const auto & part : parts)
480  {
481  totalBytes += part.second.size();
482  }
483 
484  pPacketStatusPublisher_->inbound(std::get<0>(iter->first),
485  dst,
486  priority,
487  totalBytes,
489 
490  fragmentStore_.erase(iter++);
491  }
492  else
493  {
494  ++iter;
495  }
496  }
497  }
498 }
void load(const std::string &sPCRFileName)
Definition: pormanager.cc:127
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
void updateNeighborRxMetric(NEMId src, std::uint64_t u64SeqNum, const uuid_t &uuid, const TimePoint &rxTime)
void process(std::uint64_t u64AbsoluteSlotIndex)
#define LOGGER_VERBOSE_LOGGING(logger, level, fmt, args...)
Message class used to serialize and deserialize TDMA radio model messages.
constexpr NEMId NEM_BROADCAST_MAC_ADDRESS
Definition: types.h:69
union EtherAddr dst
Definition: netutils.h:390
SpectrumServiceException is thrown when an exception occurs during spectrum service processing...
iovec make_iovec(void *base, std::size_t len)
Definition: vectorio.h:46
std::uint8_t Priority
Definition: types.h:64
std::vector< iovec > VectorIO
Definition: vectorio.h:43
Store source, destination, creation time and priority information for a packet.
Definition: packetinfo.h:50
Log service provider interface.
const char * what() const
Definition: exception.h:62
float getPOR(std::uint64_t u64DataRate, float fSINR, size_t packetLengthBytes)
Definition: pormanager.cc:280
void setFragmentTimeoutThreshold(const std::chrono::seconds &threshold)
ReceiveManager(NEMId id, DownstreamTransport *pDownstreamTransport, LogServiceProvider *pLogService, RadioServiceProvider *pRadioService, Scheduler *pScheduler, PacketStatusPublisher *pPacketStatusPublisher, NeighborMetricManager *pNeighborMetricManager)
const MessageComponents & getMessages() const
std::chrono::microseconds Microseconds
Definition: types.h:45
void loadCurves(const std::string &sPCRFileName)
std::uint16_t NEMId
Definition: types.h:52
Received over-the-air message information.
The RadioServiceProvider interface provides access to radio (RF) model specific services.
std::pair< double, bool > maxBinNoiseFloor(const SpectrumWindow &window, double dRxPowerdBm, const TimePoint &startTime=TimePoint::min())
Scheduler interface used by BaseModel to communicate with a scheduler module.
Definition: scheduler.h:56
void setFragmentCheckThreshold(const std::chrono::seconds &threshold)
std::list< FrequencySegment > FrequencySegments
virtual void processPacketMetaInfo(const PacketMetaInfo &packetMetaInfo)=0
Packet status interface used to publish statistics and tables showing accepted and rejected byte coun...
Clock::time_point TimePoint
Definition: types.h:50
void sendUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs=empty)
virtual void processSchedulerPacket(UpstreamPacket &pkt, const PacketMetaInfo &packetMetaInfo)=0
DownstreamTransport allows for processing downstream data and control messages.
virtual SpectrumServiceProvider & spectrumService()=0
virtual SpectrumWindow request(std::uint64_t u64FrequencyHz, const Microseconds &duration=Microseconds::zero(), const TimePoint &startTime=TimePoint::min()) const =0
bool enqueue(BaseModelMessage &&baseModelMessage, const PacketInfo &pktInfo, size_t length, const TimePoint &startOfReception, const FrequencySegments &frequencySegments, const Microseconds &span, const TimePoint &beginTime, std::uint64_t u64PacketSequence)
Manages neighbor metrics and sends neighbor metric control message upstream.
virtual void inbound(NEMId src, const MessageComponent &component, InboundAction action)=0