45 pDownstreamTransport_{pDownstreamTransport},
46 pLogService_{pLogService},
47 pRadioService_{pRadioService},
48 pScheduler_{pScheduler},
49 pPacketStatusPublisher_{pPacketStatusPublisher},
50 pNeighborMetricManager_{pNeighborMetricManager},
51 pendingInfo_{{},{{},{},{},{}},{},{},{},{},{},{}},
52 u64PendingAbsoluteSlotIndex_{},
53 distribution_{0.0, 1.0},
55 fragmentCheckThreshold_{2},
56 fragmentTimeoutThreshold_{5}{}
60 bPromiscuousMode_ = bEnable;
65 porManager_.
load(sPCRFileName);
70 fragmentCheckThreshold_ = threshold;
75 fragmentTimeoutThreshold_ = threshold;
87 std::uint64_t u64PacketSequence)
90 std::uint64_t u64AbsoluteSlotIndex{baseModelMessage.getAbsoluteSlotIndex()};
92 if(!u64PendingAbsoluteSlotIndex_)
94 u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
96 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
106 else if(u64PendingAbsoluteSlotIndex_ < u64AbsoluteSlotIndex)
110 u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
112 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
122 else if(u64PendingAbsoluteSlotIndex_ > u64AbsoluteSlotIndex)
126 "MACI %03hu TDMA::ReceiveManager enqueue: pending slot: %zu greater than enqueue: %zu",
128 u64PendingAbsoluteSlotIndex_,
129 u64AbsoluteSlotIndex);
131 u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
133 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
146 if(std::get<3>(pendingInfo_) < startOfReception)
148 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
165 auto now = Clock::now();
167 if(u64PendingAbsoluteSlotIndex_ + 1 == u64AbsoluteSlotIndex)
169 u64PendingAbsoluteSlotIndex_ = 0;
172 double dNoiseFloordB{};
175 PacketInfo pktInfo{std::get<1>(pendingInfo_)};
176 size_t length{std::get<2>(pendingInfo_)};
177 TimePoint & startOfReception = std::get<3>(pendingInfo_);
180 std::uint64_t u64SequenceNumber{std::get<7>(pendingInfo_)};
182 auto & frequencySegment = *frequencySegments.begin();
191 bool bSignalInNoise{};
193 std::tie(dNoiseFloordB,bSignalInNoise) =
196 dSINR = frequencySegment.getRxPowerdBm() - dNoiseFloordB;
200 "MACI %03hu TDMA::ReceiveManager upstream EOR processing:" 201 " src %hu, dst %hu, max noise %lf, signal in noise %s, SINR %lf",
204 pktInfo.getDestination(),
206 bSignalInNoise ?
"yes" :
"no",
211 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
218 "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu," 219 " dst %hu, sor %ju, span %ju spectrum service request error: %s",
222 pktInfo.getDestination(),
223 std::chrono::duration_cast<
Microseconds>(startOfReception.time_since_epoch()).count(),
236 "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu," 237 " dst %hu, datarate: %ju sinr: %lf length: %lu, por: %f",
240 pktInfo.getDestination(),
247 float fRandom{distribution_()};
251 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
257 "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu, dst %hu, " 258 "rxpwr %3.2f dBm, drop",
261 pktInfo.getDestination(),
262 frequencySegment.getRxPowerdBm());
275 frequencySegment.getDuration(),
278 for(
const auto & message : baseModelMessage.
getMessages())
280 NEMId dst{message.getDestination()};
281 Priority priority{message.getPriority()};
283 if(bPromiscuousMode_ ||
287 const auto & data = message.getData();
289 if(message.isFragment())
293 "MACI %03hu TDMA::ReceiveManager upstream EOR processing:" 294 " src %hu, dst %hu, findex: %zu foffset: %zu fbytes: %zu" 298 pktInfo.getDestination(),
299 message.getFragmentIndex(),
300 message.getFragmentOffset(),
302 message.isMoreFragments() ?
"yes" :
"no");
305 auto key = std::make_tuple(pktInfo.getSource(),
307 message.getFragmentSequence());
309 auto iter = fragmentStore_.find(key);
311 if(iter != fragmentStore_.end())
313 auto & indexSet = std::get<0>(iter->second);
314 auto & parts = std::get<1>(iter->second);
315 auto & lastFragmentTime = std::get<2>(iter->second);
316 auto & totalNumFragments = std::get<5>(iter->second);
318 if(indexSet.insert(message.getFragmentIndex()).second)
320 parts.insert(std::make_pair(message.getFragmentOffset(),message.getData()));
322 lastFragmentTime = now;
328 if(!message.isMoreFragments())
330 totalNumFragments = message.getFragmentIndex() + 1;
334 if(totalNumFragments && indexSet.size() == totalNumFragments)
338 for(
const auto & part : parts)
340 vectorIO.push_back(
Utils::make_iovec(const_cast<std::uint8_t *>(&part.second[0]),
341 part.second.size()));
347 pktInfo.getCreationTime(),
348 pktInfo.getUUID()},vectorIO};
351 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
359 u64AbsoluteSlotIndex-1,
360 frequencySegment.getRxPowerdBm(),
376 fragmentStore_.erase(iter);
386 fragmentStore_.insert(std::make_pair(key,
387 std::make_tuple(std::set<size_t>{message.getFragmentIndex()},
388 FragmentParts{{message.getFragmentOffset(),
393 message.isMoreFragments() ? 0 : message.getFragmentIndex() + 1)));
400 "MACI %03hu TDMA::ReceiveManager upstream EOR processing:" 401 " src %hu, dst %hu, forward upstream",
404 pktInfo.getDestination());
407 auto data = message.getData();
412 pktInfo.getCreationTime(),
413 pktInfo.getUUID()},&data[0],data.size()};
416 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
421 u64AbsoluteSlotIndex-1,
422 frequencySegment.getRxPowerdBm(),
440 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
448 if(lastFragmentCheckTime_ + fragmentCheckThreshold_ <= now)
450 for(
auto iter = fragmentStore_.begin(); iter != fragmentStore_.end();)
452 auto & parts = std::get<1>(iter->second);
453 auto & lastFragmentTime = std::get<2>(iter->second);
454 auto &
dst = std::get<3>(iter->second);
455 auto & priority = std::get<4>(iter->second);
457 if(lastFragmentTime + fragmentTimeoutThreshold_ <= now)
461 for(
const auto & part : parts)
463 totalBytes += part.second.size();
466 pPacketStatusPublisher_->
inbound(std::get<0>(iter->first),
472 fragmentStore_.erase(iter++);
480 lastFragmentCheckTime_ = now;
void load(const std::string &sPCRFileName)
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
void updateNeighborRxMetric(NEMId src, std::uint64_t u64SeqNum, const uuid_t &uuid, const TimePoint &rxTime)
void process(std::uint64_t u64AbsoluteSlotIndex)
#define LOGGER_VERBOSE_LOGGING(logger, level, fmt, args...)
Message class used to serialize and deserialize TDMA radio model messages.
constexpr NEMId NEM_BROADCAST_MAC_ADDRESS
SpectrumServiceException is thrown when an exception occurs during spectrum service processing...
iovec make_iovec(void *base, std::size_t len)
void setPromiscuousMode(bool bEnable)
std::vector< iovec > VectorIO
Store source, destination, creation time and priority information for a packet.
Log service provider interface.
const char * what() const
float getPOR(std::uint64_t u64DataRate, float fSINR, size_t packetLengthBytes)
void setFragmentTimeoutThreshold(const std::chrono::seconds &threshold)
ReceiveManager(NEMId id, DownstreamTransport *pDownstreamTransport, LogServiceProvider *pLogService, RadioServiceProvider *pRadioService, Scheduler *pScheduler, PacketStatusPublisher *pPacketStatusPublisher, NeighborMetricManager *pNeighborMetricManager)
const MessageComponents & getMessages() const
std::chrono::microseconds Microseconds
void loadCurves(const std::string &sPCRFileName)
The RadioServiceProvider interface provides access to radio (RF) model specific services.
std::pair< double, bool > maxBinNoiseFloor(const SpectrumWindow &window, double dRxPowerdBm, const TimePoint &startTime=TimePoint::min())
Scheduler interface used by BaseModel to communicate with a scheduler module.
void setFragmentCheckThreshold(const std::chrono::seconds &threshold)
std::list< FrequencySegment > FrequencySegments
virtual void processPacketMetaInfo(const PacketMetaInfo &packetMetaInfo)=0
Packet status interface used to publish statistics and tables showing accepted and rejected byte coun...
Clock::time_point TimePoint
void sendUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs=empty)
virtual void processSchedulerPacket(UpstreamPacket &pkt, const PacketMetaInfo &packetMetaInfo)=0
std::uint64_t getDataRate() const
DownstreamTransport allows for processing downstream data and control messages.
virtual SpectrumServiceProvider & spectrumService()=0
virtual SpectrumWindow request(std::uint64_t u64FrequencyHz, const Microseconds &duration=Microseconds::zero(), const TimePoint &startTime=TimePoint::min()) const =0
bool enqueue(BaseModelMessage &&baseModelMessage, const PacketInfo &pktInfo, size_t length, const TimePoint &startOfReception, const FrequencySegments &frequencySegments, const Microseconds &span, const TimePoint &beginTime, std::uint64_t u64PacketSequence)
Manages neighbor metrics and sends neighbor metric control message upstream.
virtual void inbound(NEMId src, const MessageComponent &component, InboundAction action)=0