EMANE  1.2.1
receivemanager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015,2017-2018 - Adjacent Link LLC, Bridgewater,
3  * New Jersey
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above copyright
13  * notice, this list of conditions and the following disclaimer in
14  * the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Adjacent Link LLC nor the names of its
17  * contributors may be used to endorse or promote products derived
18  * from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "receivemanager.h"
36 
38  DownstreamTransport * pDownstreamTransport,
39  LogServiceProvider * pLogService,
40  RadioServiceProvider * pRadioService,
41  Scheduler * pScheduler,
42  PacketStatusPublisher * pPacketStatusPublisher,
43  NeighborMetricManager * pNeighborMetricManager):
44  id_{id},
45  pDownstreamTransport_{pDownstreamTransport},
46  pLogService_{pLogService},
47  pRadioService_{pRadioService},
48  pScheduler_{pScheduler},
49  pPacketStatusPublisher_{pPacketStatusPublisher},
50  pNeighborMetricManager_{pNeighborMetricManager},
51  pendingInfo_{{},{{},{},{},{}},{},{},{},{},{},{}},
52  u64PendingAbsoluteSlotIndex_{},
53  distribution_{0.0, 1.0},
54  bPromiscuousMode_{},
55  fragmentCheckThreshold_{2},
56  fragmentTimeoutThreshold_{5}{}
57 
59 {
60  bPromiscuousMode_ = bEnable;
61 }
62 
63 void EMANE::Models::TDMA::ReceiveManager::loadCurves(const std::string & sPCRFileName)
64 {
65  porManager_.load(sPCRFileName);
66 }
67 
68 void EMANE::Models::TDMA::ReceiveManager::setFragmentCheckThreshold(const std::chrono::seconds & threshold)
69 {
70  fragmentCheckThreshold_ = threshold;
71 }
72 
73 void EMANE::Models::TDMA::ReceiveManager::setFragmentTimeoutThreshold(const std::chrono::seconds & threshold)
74 {
75  fragmentTimeoutThreshold_ = threshold;
76 }
77 
78 
79 bool
81  const PacketInfo & pktInfo,
82  size_t length,
83  const TimePoint & startOfReception,
84  const FrequencySegments & frequencySegments,
85  const Microseconds & span,
86  const TimePoint & beginTime,
87  std::uint64_t u64PacketSequence)
88 {
89  bool bReturn{};
90  std::uint64_t u64AbsoluteSlotIndex{baseModelMessage.getAbsoluteSlotIndex()};
91 
92  if(!u64PendingAbsoluteSlotIndex_)
93  {
94  u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
95 
96  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
97  pktInfo,
98  length,
99  startOfReception,
100  frequencySegments,
101  span,
102  beginTime,
103  u64PacketSequence);
104  bReturn = true;
105  }
106  else if(u64PendingAbsoluteSlotIndex_ < u64AbsoluteSlotIndex)
107  {
108  process(u64AbsoluteSlotIndex);
109 
110  u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
111 
112  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
113  pktInfo,
114  length,
115  startOfReception,
116  frequencySegments,
117  span,
118  beginTime,
119  u64PacketSequence);
120  bReturn = true;
121  }
122  else if(u64PendingAbsoluteSlotIndex_ > u64AbsoluteSlotIndex)
123  {
124  LOGGER_VERBOSE_LOGGING(*pLogService_,
125  ERROR_LEVEL,
126  "MACI %03hu TDMA::ReceiveManager enqueue: pending slot: %zu greater than enqueue: %zu",
127  id_,
128  u64PendingAbsoluteSlotIndex_,
129  u64AbsoluteSlotIndex);
130 
131  u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
132 
133  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
134  pktInfo,
135  length,
136  startOfReception,
137  frequencySegments,
138  span,
139  beginTime,
140  u64PacketSequence);
141  bReturn = true;
142 
143  }
144  else
145  {
146  if(std::get<3>(pendingInfo_) < startOfReception)
147  {
148  pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
149  pktInfo,
150  length,
151  startOfReception,
152  frequencySegments,
153  span,
154  beginTime,
155  u64PacketSequence);
156  }
157  }
158 
159  return bReturn;
160 }
161 
162 void
163 EMANE::Models::TDMA::ReceiveManager::process(std::uint64_t u64AbsoluteSlotIndex)
164 {
165  auto now = Clock::now();
166 
167  if(u64PendingAbsoluteSlotIndex_ + 1 == u64AbsoluteSlotIndex)
168  {
169  u64PendingAbsoluteSlotIndex_ = 0;
170 
171  double dSINR{};
172  double dNoiseFloordB{};
173 
174  BaseModelMessage & baseModelMessage = std::get<0>(pendingInfo_);
175  PacketInfo pktInfo{std::get<1>(pendingInfo_)};
176  size_t length{std::get<2>(pendingInfo_)};
177  TimePoint & startOfReception = std::get<3>(pendingInfo_);
178  FrequencySegments & frequencySegments = std::get<4>(pendingInfo_);
179  Microseconds & span = std::get<5>(pendingInfo_);
180  std::uint64_t u64SequenceNumber{std::get<7>(pendingInfo_)};
181 
182  auto & frequencySegment = *frequencySegments.begin();
183 
184  try
185  {
186  auto window = pRadioService_->spectrumService().request(frequencySegment.getFrequencyHz(),
187  span,
188  startOfReception);
189 
190 
191  bool bSignalInNoise{};
192 
193  std::tie(dNoiseFloordB,bSignalInNoise) =
194  Utils::maxBinNoiseFloor(window,frequencySegment.getRxPowerdBm());
195 
196  dSINR = frequencySegment.getRxPowerdBm() - dNoiseFloordB;
197 
198  LOGGER_VERBOSE_LOGGING(*pLogService_,
199  DEBUG_LEVEL,
200  "MACI %03hu TDMA::ReceiveManager upstream EOR processing:"
201  " src %hu, dst %hu, max noise %lf, signal in noise %s, SINR %lf",
202  id_,
203  pktInfo.getSource(),
204  pktInfo.getDestination(),
205  dNoiseFloordB,
206  bSignalInNoise ? "yes" : "no",
207  dSINR);
208  }
209  catch(SpectrumServiceException & exp)
210  {
211  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
212  baseModelMessage.getMessages(),
214 
215 
216  LOGGER_VERBOSE_LOGGING(*pLogService_,
217  ERROR_LEVEL,
218  "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu,"
219  " dst %hu, sor %ju, span %ju spectrum service request error: %s",
220  id_,
221  pktInfo.getSource(),
222  pktInfo.getDestination(),
223  std::chrono::duration_cast<Microseconds>(startOfReception.time_since_epoch()).count(),
224  span.count(),
225  exp.what());
226 
227  return;
228  }
229 
230 
231  // check sinr
232  float fPOR = porManager_.getPOR(baseModelMessage.getDataRate(),dSINR,length);
233 
234  LOGGER_VERBOSE_LOGGING(*pLogService_,
235  DEBUG_LEVEL,
236  "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu,"
237  " dst %hu, datarate: %ju sinr: %lf length: %lu, por: %f",
238  id_,
239  pktInfo.getSource(),
240  pktInfo.getDestination(),
241  baseModelMessage.getDataRate(),
242  dSINR,
243  length,
244  fPOR);
245 
246  // get random value [0.0, 1.0]
247  float fRandom{distribution_()};
248 
249  if(fPOR < fRandom)
250  {
251  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
252  baseModelMessage.getMessages(),
254 
255  LOGGER_VERBOSE_LOGGING(*pLogService_,
256  DEBUG_LEVEL,
257  "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu, dst %hu, "
258  "rxpwr %3.2f dBm, drop",
259  id_,
260  pktInfo.getSource(),
261  pktInfo.getDestination(),
262  frequencySegment.getRxPowerdBm());
263 
264  return;
265  }
266 
267 
268  // update neighbor metrics
269  pNeighborMetricManager_->updateNeighborRxMetric(pktInfo.getSource(), // nbr (src)
270  u64SequenceNumber, // sequence number
271  pktInfo.getUUID(),
272  dSINR, // sinr in dBm
273  dNoiseFloordB, // noise floor in dB
274  startOfReception, // rx time
275  frequencySegment.getDuration(), // duration
276  baseModelMessage.getDataRate()); // data rate bps
277 
278  for(const auto & message : baseModelMessage.getMessages())
279  {
280  NEMId dst{message.getDestination()};
281  Priority priority{message.getPriority()};
282 
283  if(bPromiscuousMode_ ||
284  (dst == id_) ||
286  {
287  const auto & data = message.getData();
288 
289  if(message.isFragment())
290  {
291  LOGGER_VERBOSE_LOGGING(*pLogService_,
292  DEBUG_LEVEL,
293  "MACI %03hu TDMA::ReceiveManager upstream EOR processing:"
294  " src %hu, dst %hu, findex: %zu foffset: %zu fbytes: %zu"
295  " fmore: %s",
296  id_,
297  pktInfo.getSource(),
298  pktInfo.getDestination(),
299  message.getFragmentIndex(),
300  message.getFragmentOffset(),
301  data.size(),
302  message.isMoreFragments() ? "yes" : "no");
303 
304 
305  auto key = std::make_tuple(pktInfo.getSource(),
306  priority,
307  message.getFragmentSequence());
308 
309  auto iter = fragmentStore_.find(key);
310 
311  if(iter != fragmentStore_.end())
312  {
313  auto & indexSet = std::get<0>(iter->second);
314  auto & parts = std::get<1>(iter->second);
315  auto & lastFragmentTime = std::get<2>(iter->second);
316  auto & totalNumFragments = std::get<5>(iter->second);
317 
318  if(indexSet.insert(message.getFragmentIndex()).second)
319  {
320  parts.insert(std::make_pair(message.getFragmentOffset(),message.getData()));
321 
322  lastFragmentTime = now;
323 
324  // this is a new fragment. If the more
325  // fragments bit is not set then this is the
326  // last fragment piece so set the
327  // totalNumFragments appropriately.
328  if(!message.isMoreFragments())
329  {
330  totalNumFragments = message.getFragmentIndex() + 1;
331  }
332 
333  // check to see if all fragments have been received
334  if(totalNumFragments && indexSet.size() == totalNumFragments)
335  {
336  Utils::VectorIO vectorIO{};
337 
338  for(const auto & part : parts)
339  {
340  vectorIO.push_back(Utils::make_iovec(const_cast<std::uint8_t *>(&part.second[0]),
341  part.second.size()));
342  }
343 
344  UpstreamPacket pkt{{pktInfo.getSource(),
345  dst,
346  priority,
347  pktInfo.getCreationTime(),
348  pktInfo.getUUID()},vectorIO};
349 
350 
351  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
352  dst,
353  priority,
354  pkt.length(),
356 
357 
358  PacketMetaInfo packetMetaInfo{pktInfo.getSource(),
359  u64AbsoluteSlotIndex-1,
360  frequencySegment.getRxPowerdBm(),
361  dSINR,
362  baseModelMessage.getDataRate()};
363 
364  if(message.getType() == MessageComponent::Type::DATA)
365  {
366  pDownstreamTransport_->sendUpstreamPacket(pkt);
367 
368  pScheduler_->processPacketMetaInfo(packetMetaInfo);
369  }
370  else
371  {
372  pScheduler_->processSchedulerPacket(pkt,packetMetaInfo);
373  }
374 
375 
376  fragmentStore_.erase(iter);
377  }
378  }
379  }
380  else
381  {
382  // this is the first fragment for this
383  // message. Just need to set the total number of
384  // fragments if this happens to also be the last
385  // fragment in the set.
386  fragmentStore_.insert(std::make_pair(key,
387  std::make_tuple(std::set<size_t>{message.getFragmentIndex()},
388  FragmentParts{{message.getFragmentOffset(),
389  message.getData()}},
390  now,
391  dst,
392  priority,
393  message.isMoreFragments() ? 0 : message.getFragmentIndex() + 1)));
394  }
395  }
396  else
397  {
398  LOGGER_VERBOSE_LOGGING(*pLogService_,
399  DEBUG_LEVEL,
400  "MACI %03hu TDMA::ReceiveManager upstream EOR processing:"
401  " src %hu, dst %hu, forward upstream",
402  id_,
403  pktInfo.getSource(),
404  pktInfo.getDestination());
405 
406 
407  auto data = message.getData();
408 
409  UpstreamPacket pkt{{pktInfo.getSource(),
410  dst,
411  priority,
412  pktInfo.getCreationTime(),
413  pktInfo.getUUID()},&data[0],data.size()};
414 
415 
416  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
417  message,
419 
420  PacketMetaInfo packetMetaInfo{pktInfo.getSource(),
421  u64AbsoluteSlotIndex-1,
422  frequencySegment.getRxPowerdBm(),
423  dSINR,
424  baseModelMessage.getDataRate()};
425 
426  if(message.getType() == MessageComponent::Type::DATA)
427  {
428  pDownstreamTransport_->sendUpstreamPacket(pkt);
429 
430  pScheduler_->processPacketMetaInfo(packetMetaInfo);
431  }
432  else
433  {
434  pScheduler_->processSchedulerPacket(pkt,packetMetaInfo);
435  }
436  }
437  }
438  else
439  {
440  pPacketStatusPublisher_->inbound(pktInfo.getSource(),
441  message,
443  }
444  }
445  }
446 
447  // check to see if there are fragment assemblies to abandon
448  if(lastFragmentCheckTime_ + fragmentCheckThreshold_ <= now)
449  {
450  for(auto iter = fragmentStore_.begin(); iter != fragmentStore_.end();)
451  {
452  auto & parts = std::get<1>(iter->second);
453  auto & lastFragmentTime = std::get<2>(iter->second);
454  auto & dst = std::get<3>(iter->second);
455  auto & priority = std::get<4>(iter->second);
456 
457  if(lastFragmentTime + fragmentTimeoutThreshold_ <= now)
458  {
459  size_t totalBytes{};
460 
461  for(const auto & part : parts)
462  {
463  totalBytes += part.second.size();
464  }
465 
466  pPacketStatusPublisher_->inbound(std::get<0>(iter->first),
467  dst,
468  priority,
469  totalBytes,
471 
472  fragmentStore_.erase(iter++);
473  }
474  else
475  {
476  ++iter;
477  }
478  }
479 
480  lastFragmentCheckTime_ = now;
481  }
482 }
void load(const std::string &sPCRFileName)
Definition: pormanager.cc:127
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
void updateNeighborRxMetric(NEMId src, std::uint64_t u64SeqNum, const uuid_t &uuid, const TimePoint &rxTime)
void process(std::uint64_t u64AbsoluteSlotIndex)
#define LOGGER_VERBOSE_LOGGING(logger, level, fmt, args...)
Message class used to serialize and deserialize TDMA radio model messages.
constexpr NEMId NEM_BROADCAST_MAC_ADDRESS
Definition: types.h:69
union EtherAddr dst
Definition: netutils.h:390
SpectrumServiceException is thrown when an exception occurs during spectrum service processing...
iovec make_iovec(void *base, std::size_t len)
Definition: vectorio.h:46
std::uint8_t Priority
Definition: types.h:64
std::vector< iovec > VectorIO
Definition: vectorio.h:43
Store source, destination, creation time and priority information for a packet.
Definition: packetinfo.h:50
Log service provider interface.
const char * what() const
Definition: exception.h:62
float getPOR(std::uint64_t u64DataRate, float fSINR, size_t packetLengthBytes)
Definition: pormanager.cc:280
void setFragmentTimeoutThreshold(const std::chrono::seconds &threshold)
ReceiveManager(NEMId id, DownstreamTransport *pDownstreamTransport, LogServiceProvider *pLogService, RadioServiceProvider *pRadioService, Scheduler *pScheduler, PacketStatusPublisher *pPacketStatusPublisher, NeighborMetricManager *pNeighborMetricManager)
const MessageComponents & getMessages() const
std::chrono::microseconds Microseconds
Definition: types.h:45
void loadCurves(const std::string &sPCRFileName)
std::uint16_t NEMId
Definition: types.h:52
Received over-the-air message information.
The RadioServiceProvider interface provides access to radio (RF) model specific services.
std::pair< double, bool > maxBinNoiseFloor(const SpectrumWindow &window, double dRxPowerdBm, const TimePoint &startTime=TimePoint::min())
Scheduler interface used by BaseModel to communicate with a scheduler module.
Definition: scheduler.h:56
void setFragmentCheckThreshold(const std::chrono::seconds &threshold)
std::list< FrequencySegment > FrequencySegments
virtual void processPacketMetaInfo(const PacketMetaInfo &packetMetaInfo)=0
Packet status interface used to publish statistics and tables showing accepted and rejected byte coun...
Clock::time_point TimePoint
Definition: types.h:50
void sendUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs=empty)
virtual void processSchedulerPacket(UpstreamPacket &pkt, const PacketMetaInfo &packetMetaInfo)=0
DownstreamTransport allows for processing downstream data and control messages.
virtual SpectrumServiceProvider & spectrumService()=0
virtual SpectrumWindow request(std::uint64_t u64FrequencyHz, const Microseconds &duration=Microseconds::zero(), const TimePoint &startTime=TimePoint::min()) const =0
bool enqueue(BaseModelMessage &&baseModelMessage, const PacketInfo &pktInfo, size_t length, const TimePoint &startOfReception, const FrequencySegments &frequencySegments, const Microseconds &span, const TimePoint &beginTime, std::uint64_t u64PacketSequence)
Manages neighbor metrics and sends neighbor metric control message upstream.
virtual void inbound(NEMId src, const MessageComponent &component, InboundAction action)=0