44 pDownstreamTransport_{pDownstreamTransport},
45 pLogService_{pLogService},
46 pRadioService_{pRadioService},
47 pScheduler_{pScheduler},
48 pPacketStatusPublisher_{pPacketStatusPublisher},
49 pNeighborMetricManager_{pNeighborMetricManager},
50 pendingInfo_{{},{{},{},{},{}},{},{},{},{},{},{}},
51 u64PendingAbsoluteSlotIndex_{},
52 distribution_{0.0, 1.0},
54 fragmentCheckThreshold_{2},
55 fragmentTimeoutThreshold_{5}{}
59 bPromiscuousMode_ = bEnable;
64 porManager_.
load(sPCRFileName);
69 fragmentCheckThreshold_ = threshold;
74 fragmentTimeoutThreshold_ = threshold;
86 std::uint64_t u64PacketSequence)
89 std::uint64_t u64AbsoluteSlotIndex{baseModelMessage.getAbsoluteSlotIndex()};
91 if(!u64PendingAbsoluteSlotIndex_)
93 u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
95 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
105 else if(u64PendingAbsoluteSlotIndex_ < u64AbsoluteSlotIndex)
109 u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
111 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
121 else if(u64PendingAbsoluteSlotIndex_ > u64AbsoluteSlotIndex)
125 "MACI %03hu TDMA::ReceiveManager enqueue: pending slot: %zu greater than enqueue: %zu",
127 u64PendingAbsoluteSlotIndex_,
128 u64AbsoluteSlotIndex);
130 u64PendingAbsoluteSlotIndex_ = u64AbsoluteSlotIndex;
132 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
145 if(std::get<3>(pendingInfo_) < startOfReception)
147 pendingInfo_ = std::make_tuple(std::move(baseModelMessage),
164 auto now = Clock::now();
166 if(u64PendingAbsoluteSlotIndex_ + 1 == u64AbsoluteSlotIndex)
168 u64PendingAbsoluteSlotIndex_ = 0;
171 double dNoiseFloordB{};
174 PacketInfo pktInfo{std::get<1>(pendingInfo_)};
175 size_t length{std::get<2>(pendingInfo_)};
176 TimePoint & startOfReception = std::get<3>(pendingInfo_);
179 std::uint64_t u64SequenceNumber{std::get<7>(pendingInfo_)};
181 auto & frequencySegment = *frequencySegments.begin();
190 bool bSignalInNoise{};
192 std::tie(dNoiseFloordB,bSignalInNoise) =
195 dSINR = frequencySegment.getRxPowerdBm() - dNoiseFloordB;
199 "MACI %03hu TDMA::ReceiveManager upstream EOR processing:" 200 " src %hu, dst %hu, max noise %lf, signal in noise %s, SINR %lf",
203 pktInfo.getDestination(),
205 bSignalInNoise ?
"yes" :
"no",
210 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
217 "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu," 218 " dst %hu, sor %ju, span %ju spectrum service request error: %s",
221 pktInfo.getDestination(),
222 std::chrono::duration_cast<
Microseconds>(startOfReception.time_since_epoch()).count(),
235 "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu," 236 " dst %hu, datarate: %ju sinr: %lf length: %lu, por: %f",
239 pktInfo.getDestination(),
246 float fRandom{distribution_()};
250 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
256 "MACI %03hu TDMA::ReceiveManager upstream EOR processing: src %hu, dst %hu, " 257 "rxpwr %3.2f dBm, drop",
260 pktInfo.getDestination(),
261 frequencySegment.getRxPowerdBm());
274 frequencySegment.getDuration(),
277 for(
const auto & message : baseModelMessage.
getMessages())
279 NEMId dst{message.getDestination()};
280 Priority priority{message.getPriority()};
282 if(bPromiscuousMode_ ||
286 const auto & data = message.getData();
288 if(message.isFragment())
292 "MACI %03hu TDMA::ReceiveManager upstream EOR processing:" 293 " src %hu, dst %hu, findex: %zu foffset: %zu fbytes: %zu" 297 pktInfo.getDestination(),
298 message.getFragmentIndex(),
299 message.getFragmentOffset(),
301 message.isMoreFragments() ?
"yes" :
"no");
304 auto key = std::make_tuple(pktInfo.getSource(),
306 message.getFragmentSequence());
308 auto iter = fragmentStore_.find(key);
310 if(iter != fragmentStore_.end())
312 auto & indexSet = std::get<0>(iter->second);
313 auto & parts = std::get<1>(iter->second);
314 auto & lastFragmentTime = std::get<2>(iter->second);
316 if(indexSet.insert(message.getFragmentIndex()).second)
318 parts.insert(std::make_pair(message.getFragmentOffset(),message.getData()));
320 lastFragmentTime = now;
323 if(indexSet.size() == message.getFragmentIndex() + 1)
325 if(!message.isMoreFragments())
329 for(
const auto & part : parts)
331 vectorIO.push_back(
Utils::make_iovec(const_cast<std::uint8_t *>(&part.second[0]),
332 part.second.size()));
338 pktInfo.getCreationTime(),
339 pktInfo.getUUID()},vectorIO};
342 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
350 u64AbsoluteSlotIndex-1,
351 frequencySegment.getRxPowerdBm(),
367 fragmentStore_.erase(iter);
373 size_t totalBytes{message.getData().size()};
375 for(
const auto & part : parts)
377 totalBytes += part.second.size();
380 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
387 fragmentStore_.erase(iter);
395 if(!message.getFragmentIndex())
397 fragmentStore_.insert(std::make_pair(key,
398 std::make_tuple(std::set<size_t>{message.getFragmentIndex()},
399 FragmentParts{{message.getFragmentOffset(),
407 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
418 "MACI %03hu TDMA::ReceiveManager upstream EOR processing:" 419 " src %hu, dst %hu, forward upstream",
422 pktInfo.getDestination());
425 auto data = message.getData();
430 pktInfo.getCreationTime(),
431 pktInfo.getUUID()},&data[0],data.size()};
434 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
439 u64AbsoluteSlotIndex-1,
440 frequencySegment.getRxPowerdBm(),
458 pPacketStatusPublisher_->
inbound(pktInfo.getSource(),
466 if(lastFragmentCheckTime_ + fragmentCheckThreshold_ <= now)
468 for(
auto iter = fragmentStore_.begin(); iter != fragmentStore_.end();)
470 auto & parts = std::get<1>(iter->second);
471 auto & lastFragmentTime = std::get<2>(iter->second);
472 auto &
dst = std::get<3>(iter->second);
473 auto & priority = std::get<4>(iter->second);
475 if(lastFragmentTime + fragmentTimeoutThreshold_ <= now)
479 for(
const auto & part : parts)
481 totalBytes += part.second.size();
484 pPacketStatusPublisher_->
inbound(std::get<0>(iter->first),
490 fragmentStore_.erase(iter++);
void load(const std::string &sPCRFileName)
A Packet class that allows upstream processing to strip layer headers as the packet travels up the st...
void updateNeighborRxMetric(NEMId src, std::uint64_t u64SeqNum, const uuid_t &uuid, const TimePoint &rxTime)
void process(std::uint64_t u64AbsoluteSlotIndex)
#define LOGGER_VERBOSE_LOGGING(logger, level, fmt, args...)
Message class used to serialize and deserialize TDMA radio model messages.
constexpr NEMId NEM_BROADCAST_MAC_ADDRESS
SpectrumServiceException is thrown when an exception occurs during spectrum service processing...
iovec make_iovec(void *base, std::size_t len)
void setPromiscuousMode(bool bEnable)
std::vector< iovec > VectorIO
Store source, destination, creation time and priority information for a packet.
Log service provider interface.
const char * what() const
float getPOR(std::uint64_t u64DataRate, float fSINR, size_t packetLengthBytes)
void setFragmentTimeoutThreshold(const std::chrono::seconds &threshold)
ReceiveManager(NEMId id, DownstreamTransport *pDownstreamTransport, LogServiceProvider *pLogService, RadioServiceProvider *pRadioService, Scheduler *pScheduler, PacketStatusPublisher *pPacketStatusPublisher, NeighborMetricManager *pNeighborMetricManager)
const MessageComponents & getMessages() const
std::chrono::microseconds Microseconds
void loadCurves(const std::string &sPCRFileName)
The RadioServiceProvider interface provides access to radio (RF) model specific services.
std::pair< double, bool > maxBinNoiseFloor(const SpectrumWindow &window, double dRxPowerdBm, const TimePoint &startTime=TimePoint::min())
Scheduler interface used by BaseModel to communicate with a scheduler module.
void setFragmentCheckThreshold(const std::chrono::seconds &threshold)
std::list< FrequencySegment > FrequencySegments
virtual void processPacketMetaInfo(const PacketMetaInfo &packetMetaInfo)=0
Packet status interface used to publish statistics and tables showing accepted and rejected byte coun...
Clock::time_point TimePoint
void sendUpstreamPacket(UpstreamPacket &pkt, const ControlMessages &msgs=empty)
virtual void processSchedulerPacket(UpstreamPacket &pkt, const PacketMetaInfo &packetMetaInfo)=0
std::uint64_t getDataRate() const
DownstreamTransport allows for processing downstream data and control messages.
virtual SpectrumServiceProvider & spectrumService()=0
virtual SpectrumWindow request(std::uint64_t u64FrequencyHz, const Microseconds &duration=Microseconds::zero(), const TimePoint &startTime=TimePoint::min()) const =0
bool enqueue(BaseModelMessage &&baseModelMessage, const PacketInfo &pktInfo, size_t length, const TimePoint &startOfReception, const FrequencySegments &frequencySegments, const Microseconds &span, const TimePoint &beginTime, std::uint64_t u64PacketSequence)
Manages neighbor metrics and sends neighbor metric control message upstream.
virtual void inbound(NEMId src, const MessageComponent &component, InboundAction action)=0