50 eventStatisticPublisher_{
"EventChannel"},
59 if(thread_.joinable())
70 auto iter = eventServiceUserMap_.find(buildId);
72 if(iter != eventServiceUserMap_.end())
74 eventRegistrationMap_.insert(std::make_pair(eventId,
75 std::make_tuple(buildId,
77 iter->second.second)));
89 eventServiceUserMap_.insert(std::make_pair(buildId,std::make_pair(nemId,pEventServiceUser)));
93 const std::string & sDevice,
104 uuid_copy(uuid_,uuid);
108 const char * device{sDevice.empty() ? nullptr : sDevice.c_str()};
112 mcast_.
open(eventChannelAddress,
true,device,iTTL,loopbackEnable);
116 std::stringstream sstream;
118 <<
"Platform Event Service: Unable to open Event Service socket: '" 119 <<eventChannelAddress.
str()
123 <<
"Possible reason(s):" 125 <<
" * No Multicast device specified and routing table non deterministic" 127 <<
" (no multicast route and no default route)." 129 <<
" * Multicast device " 131 <<
" does not exist or is not up." 140 thread_ = std::thread(&EventService::process,
this);
146 "EventService::open: Unable to set Real Time Priority");
154 const Event & event)
const 167 const auto ret = eventRegistrationMap_.equal_range(eventId);
173 for(EventRegistrationMap::const_iterator iter = ret.first;
178 NEMId registeredNEMId{};
181 std::tie(registeredBuildId,
183 pEventServiceUser) = iter->second;
185 if(!buildId || registeredBuildId != buildId)
187 if(!nemId || registeredNEMId == nemId)
196 "EventService sendEvent skipping originator" 197 " event id:%hu for NEM:%hu buildId: %hu",
209 auto pData = msg.mutable_data();
211 auto pSerialization = pData->add_serializations();
213 pSerialization->set_nemid(nemId);
215 pSerialization->set_eventid(eventId);
217 pSerialization->set_data(serialization);
219 msg.set_uuid(reinterpret_cast<const char *>(uuid_),
sizeof(uuid_));
221 msg.set_sequencenumber(++u64SequenceNumber_);
223 std::string sSerialization;
225 if(!msg.SerializeToString(&sSerialization))
229 "EventService sendEvent " 230 "unable to send event id:%hu for NEM:%hu\n",
238 "Event %03hu EMANE::EventService::sendEvent",
241 std::uint16_t u16Length =
HTONS(sSerialization.size());
244 {
reinterpret_cast<char *
>(&u16Length),
sizeof(u16Length)},
245 {
const_cast<char *
>(sSerialization.c_str()),sSerialization.size()}};
247 if(mcast_.
send(&vectorIO[0],static_cast<int>(vectorIO.size())) == -1)
251 "EventService sendEvent " 252 "unable to send event id:%hu for NEM:%hu\n",
269 "Event %03hu EMANE::EventService::sendEvent, not open, drop",
277 NEMId ignoreNEM)
const 279 const auto ret = eventRegistrationMap_.equal_range(eventId);
281 for(EventRegistrationMap::const_iterator iter = ret.first;
286 NEMId registeredNEMId{};
289 std::tie(registeredBuildId,registeredNEMId,pEventServiceUser) = iter->second;
291 if(!ignoreNEM || ignoreNEM != registeredNEMId)
293 if(!nemId || registeredNEMId == nemId)
302 "EventService sendEvent skipping all layers of" 303 " originating nem event id:%hu NEM:%hu",
316 void EMANE::EventService::process()
318 std::uint8_t buf[65536];
323 "EventService::processEventMessage");
327 if((len = mcast_.
recv(buf,
sizeof(buf),0)) > 0)
331 "EventService packet received len: %zd",
334 std::uint16_t * pu16Length{
reinterpret_cast<std::uint16_t *
>(buf)};
336 *pu16Length =
NTOHS(*pu16Length);
338 len -=
sizeof(std::uint16_t);
342 if(static_cast<size_t>(len) == *pu16Length &&
343 msg.ParseFromArray(&buf[2], *pu16Length))
346 uuid_copy(remoteUUID,reinterpret_cast<const unsigned char *>(msg.uuid().data()));
349 if(uuid_compare(uuid_,remoteUUID))
351 for(
const auto & serialization : msg.data().serializations())
353 NEMId nemId{
static_cast<NEMId>(serialization.nemid())};
355 const auto ret = eventRegistrationMap_.equal_range(static_cast<EventId>(serialization.eventid()));
357 for(EventRegistrationMap::const_iterator iter = ret.first;
362 NEMId registeredNEMId{};
365 std::tie(registeredBuildId,registeredNEMId,pEventServiceUser) = iter->second;
367 if(!nemId || !registeredNEMId || registeredNEMId == nemId)
369 pEventServiceUser->
processEvent(static_cast<EventId>(serialization.eventid()),
370 serialization.data());
376 serialization.eventid());
384 "EventService unable to deserialize event");
391 "EventService Packet Receive error");
ssize_t recv(void *buf, size_t len, int flags=0)
std::string Serialization
EventServiceUser interface.
Event interface is the base for all events.
void sendEvent(BuildId buildId, NEMId nemId, const Event &event) const
std::vector< iovec > VectorIO
ssize_t send(const iovec *iov, int iovcnt, int flags=0) const
const char * what() const
EventId getEventId() const
void update(Type type, const uuid_t &uuid, EventId eventId)
void open(const INETAddr &eventChannelAddress, const std::string &sDevice, int iTTL, bool loopbackEnable, const uuid_t &uuid)
std::string str(bool bWithPort=true) const
int cancel(std::thread &thread)
RegistrarException is thrown when an exception occurs during registration.
virtual void processEvent(const EventId &eventId, const Serialization &serialization)
void setRowLimit(size_t rows)
void open(const INETAddr &address, bool bReuseAddress=false, const std::string &sDevice="", std::uint8_t u8TTL=1, bool bLoop=false)
constexpr std::uint16_t HTONS(std::uint16_t x)
void registerEventServiceUser(BuildId buildId, EventServiceUser *pEventServiceUser, NEMId=0)
constexpr std::uint16_t NTOHS(std::uint16_t x)
void setStatEventCountRowLimit(size_t rows)
void registerEvent(BuildId buildId, EventId eventId)
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static LogService * instance()
Exception thrown during open/establishment of the event service communication channel.
int elevate(std::thread &thread)
void processEventMessage(NEMId nemId, EventId eventId, const Serialization &serialization, NEMId ignoreNEM={}) const