EMANE  1.2.1
eventservice.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2016 - Adjacent Link LLC, Bridgewater, New Jersey
3  * Copyright (c) 2008-2012 - DRS CenGen, LLC, Columbia, Maryland
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above copyright
13  * notice, this list of conditions and the following disclaimer in
14  * the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of DRS CenGen, LLC nor the names of its
17  * contributors may be used to endorse or promote products derived
18  * from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "eventservice.h"
36 #include "eventserviceexception.h"
37 #include "event.pb.h"
38 #include "logservice.h"
39 #include "eventservice.h"
40 #include "socketexception.h"
41 
42 #include "emane/utils/vectorio.h"
44 #include "emane/net.h"
45 
46 #include <sstream>
47 
49  bOpen_{false},
50  eventStatisticPublisher_{"EventChannel"},
51  u64SequenceNumber_{}
52 {
53  uuid_clear(uuid_);
54 }
55 
56 
58 {
59  if(thread_.joinable())
60  {
61  ThreadUtils::cancel(thread_);
62 
63  thread_.join();
64  }
65 }
66 
68 
69 {
70  auto iter = eventServiceUserMap_.find(buildId);
71 
72  if(iter != eventServiceUserMap_.end())
73  {
74  eventRegistrationMap_.insert(std::make_pair(eventId,
75  std::make_tuple(buildId,
76  iter->second.first,
77  iter->second.second)));
78  }
79  else
80  {
81  throw RegistrarException{"Component not eligible to register for events"};
82  }
83 }
84 
86  EventServiceUser * pEventServiceUser,
87  NEMId nemId)
88 {
89  eventServiceUserMap_.insert(std::make_pair(buildId,std::make_pair(nemId,pEventServiceUser)));
90 }
91 
92 void EMANE::EventService::open(const INETAddr & eventChannelAddress,
93  const std::string & sDevice,
94  int iTTL,
95  bool loopbackEnable,
96  const uuid_t & uuid)
97 {
98  if(bOpen_)
99  {
100  throw EventServiceException("EventService already open");
101  }
102  else
103  {
104  uuid_copy(uuid_,uuid);
105 
106  bOpen_ = true;
107 
108  const char * device{sDevice.empty() ? nullptr : sDevice.c_str()};
109 
110  try
111  {
112  mcast_.open(eventChannelAddress,true,device,iTTL,loopbackEnable);
113  }
114  catch(SocketException & exp)
115  {
116  std::stringstream sstream;
117  sstream
118  <<"Platform Event Service: Unable to open Event Service socket: '"
119  <<eventChannelAddress.str()
120  <<"'."
121  <<std::endl
122  <<std::endl
123  <<"Possible reason(s):"
124  <<std::endl
125  <<" * No Multicast device specified and routing table non deterministic"
126  <<std::endl
127  <<" (no multicast route and no default route)."
128  <<std::endl
129  <<" * Multicast device "
130  <<sDevice
131  <<" does not exist or is not up."
132  <<std::endl
133  <<exp.what()
134  <<std::endl
135  <<std::ends;
136 
137  throw EventServiceException(sstream.str());
138  }
139 
140  thread_ = std::thread(&EventService::process,this);
141 
142  if(ThreadUtils::elevate(thread_))
143  {
145  ERROR_LEVEL,
146  "EventService::open: Unable to set Real Time Priority");
147  }
148  }
149 }
150 
151 
153  NEMId nemId,
154  const Event & event) const
155 {
156  sendEvent(buildId,nemId,event.getEventId(),event.serialize());
157 }
158 
159 
161  NEMId nemId,
162  EventId eventId,
163  const Serialization & serialization) const
164 {
165 
166  // determine if there are any locally registered users for this event
167  const auto ret = eventRegistrationMap_.equal_range(eventId);
168 
169  // for each local event service user registered for this event
170  // determine based on the nemId target whether they should
171  // receive the event. The source (base on buildId) will never
172  // receive an event it generated
173  for(EventRegistrationMap::const_iterator iter = ret.first;
174  iter != ret.second;
175  ++iter)
176  {
177  BuildId registeredBuildId{};
178  NEMId registeredNEMId{};
179  EventServiceUser * pEventServiceUser{};
180 
181  std::tie(registeredBuildId,
182  registeredNEMId,
183  pEventServiceUser) = iter->second;
184 
185  if(!buildId || registeredBuildId != buildId)
186  {
187  if(!nemId || registeredNEMId == nemId)
188  {
189  pEventServiceUser->processEvent(eventId,serialization);
190  }
191  }
192  else
193  {
195  DEBUG_LEVEL,
196  "EventService sendEvent skipping originator"
197  " event id:%hu for NEM:%hu buildId: %hu",
198  eventId,
199  registeredNEMId,
200  buildId);
201  }
202  }
203 
204  // send the event out via the multicast channel
205  if(bOpen_)
206  {
208 
209  auto pData = msg.mutable_data();
210 
211  auto pSerialization = pData->add_serializations();
212 
213  pSerialization->set_nemid(nemId);
214 
215  pSerialization->set_eventid(eventId);
216 
217  pSerialization->set_data(serialization);
218 
219  msg.set_uuid(reinterpret_cast<const char *>(uuid_),sizeof(uuid_));
220 
221  msg.set_sequencenumber(++u64SequenceNumber_);
222 
223  std::string sSerialization;
224 
225  if(!msg.SerializeToString(&sSerialization))
226  {
228  ERROR_LEVEL,
229  "EventService sendEvent "
230  "unable to send event id:%hu for NEM:%hu\n",
231  eventId,
232  nemId);
233  }
234  else
235  {
237  DEBUG_LEVEL,
238  "Event %03hu EMANE::EventService::sendEvent",
239  eventId);
240 
241  std::uint16_t u16Length = HTONS(sSerialization.size());
242 
243  Utils::VectorIO vectorIO{
244  {reinterpret_cast<char *>(&u16Length),sizeof(u16Length)},
245  {const_cast<char *>(sSerialization.c_str()),sSerialization.size()}};
246 
247  if(mcast_.send(&vectorIO[0],static_cast<int>(vectorIO.size())) == -1)
248  {
250  ERROR_LEVEL,
251  "EventService sendEvent "
252  "unable to send event id:%hu for NEM:%hu\n",
253  eventId,
254  nemId);
255  }
256  else
257  {
258  eventStatisticPublisher_.update(EventStatisticPublisher::Type::TYPE_TX,
259  uuid_,
260  eventId);
261 
262  }
263  }
264  }
265  else
266  {
268  ERROR_LEVEL,
269  "Event %03hu EMANE::EventService::sendEvent, not open, drop",
270  eventId);
271  }
272 }
273 
275  EventId eventId,
276  const Serialization & serialization,
277  NEMId ignoreNEM) const
278 {
279  const auto ret = eventRegistrationMap_.equal_range(eventId);
280 
281  for(EventRegistrationMap::const_iterator iter = ret.first;
282  iter != ret.second;
283  ++iter)
284  {
285  BuildId registeredBuildId{};
286  NEMId registeredNEMId{};
287  EventServiceUser * pEventServiceUser{};
288 
289  std::tie(registeredBuildId,registeredNEMId,pEventServiceUser) = iter->second;
290 
291  if(!ignoreNEM || ignoreNEM != registeredNEMId)
292  {
293  if(!nemId || registeredNEMId == nemId)
294  {
295  pEventServiceUser->processEvent(eventId,serialization);
296  }
297  }
298  else
299  {
301  DEBUG_LEVEL,
302  "EventService sendEvent skipping all layers of"
303  " originating nem event id:%hu NEM:%hu",
304  eventId,
305  ignoreNEM);
306 
307  }
308  }
309 }
310 
312 {
313  eventStatisticPublisher_.setRowLimit(rows);
314 }
315 
316 void EMANE::EventService::process()
317 {
318  std::uint8_t buf[65536];
319  ssize_t len = 0;
320 
322  DEBUG_LEVEL,
323  "EventService::processEventMessage");
324 
325  while(1)
326  {
327  if((len = mcast_.recv(buf,sizeof(buf),0)) > 0)
328  {
330  DEBUG_LEVEL,
331  "EventService packet received len: %zd",
332  len);
333 
334  std::uint16_t * pu16Length{reinterpret_cast<std::uint16_t *>(buf)};
335 
336  *pu16Length = NTOHS(*pu16Length);
337 
338  len -= sizeof(std::uint16_t);
339 
341 
342  if(static_cast<size_t>(len) == *pu16Length &&
343  msg.ParseFromArray(&buf[2], *pu16Length))
344  {
345  uuid_t remoteUUID;
346  uuid_copy(remoteUUID,reinterpret_cast<const unsigned char *>(msg.uuid().data()));
347 
348  // only process multicast events that were not sourced locally
349  if(uuid_compare(uuid_,remoteUUID))
350  {
351  for(const auto & serialization : msg.data().serializations())
352  {
353  NEMId nemId{static_cast<NEMId>(serialization.nemid())};
354 
355  const auto ret = eventRegistrationMap_.equal_range(static_cast<EventId>(serialization.eventid()));
356 
357  for(EventRegistrationMap::const_iterator iter = ret.first;
358  iter != ret.second;
359  ++iter)
360  {
361  BuildId registeredBuildId{};
362  NEMId registeredNEMId{};
363  EventServiceUser * pEventServiceUser{};
364 
365  std::tie(registeredBuildId,registeredNEMId,pEventServiceUser) = iter->second;
366 
367  if(!nemId || !registeredNEMId || registeredNEMId == nemId)
368  {
369  pEventServiceUser->processEvent(static_cast<EventId>(serialization.eventid()),
370  serialization.data());
371  }
372  }
373 
374  eventStatisticPublisher_.update(EventStatisticPublisher::Type::TYPE_RX,
375  remoteUUID,
376  serialization.eventid());
377  }
378  }
379  }
380  else
381  {
383  ERROR_LEVEL,
384  "EventService unable to deserialize event");
385  }
386  }
387  else
388  {
390  ERROR_LEVEL,
391  "EventService Packet Receive error");
392  break;
393  }
394  }
395 
396 }
ssize_t recv(void *buf, size_t len, int flags=0)
std::string Serialization
Definition: serializable.h:42
EventServiceUser interface.
Event interface is the base for all events.
Definition: event.h:46
void sendEvent(BuildId buildId, NEMId nemId, const Event &event) const
std::vector< iovec > VectorIO
Definition: vectorio.h:43
std::uint16_t EventId
Definition: types.h:53
ssize_t send(const iovec *iov, int iovcnt, int flags=0) const
const char * what() const
Definition: exception.h:62
EventId getEventId() const
Definition: event.h:59
void update(Type type, const uuid_t &uuid, EventId eventId)
void open(const INETAddr &eventChannelAddress, const std::string &sDevice, int iTTL, bool loopbackEnable, const uuid_t &uuid)
Definition: eventservice.cc:92
std::string str(bool bWithPort=true) const
Definition: inetaddr.cc:409
int cancel(std::thread &thread)
Definition: threadutils.h:65
std::uint16_t NEMId
Definition: types.h:52
RegistrarException is thrown when an exception occurs during registration.
virtual void processEvent(const EventId &eventId, const Serialization &serialization)
void open(const INETAddr &address, bool bReuseAddress=false, const std::string &sDevice="", std::uint8_t u8TTL=1, bool bLoop=false)
constexpr std::uint16_t HTONS(std::uint16_t x)
Definition: net.h:125
void registerEventServiceUser(BuildId buildId, EventServiceUser *pEventServiceUser, NEMId=0)
Definition: eventservice.cc:85
constexpr std::uint16_t NTOHS(std::uint16_t x)
Definition: net.h:136
void setStatEventCountRowLimit(size_t rows)
void registerEvent(BuildId buildId, EventId eventId)
Definition: eventservice.cc:67
#define LOGGER_STANDARD_LOGGING(logger, level, fmt, args...)
static LogService * instance()
Definition: singleton.h:56
Exception thrown during open/establishment of the event service communication channel.
int elevate(std::thread &thread)
Definition: threadutils.h:44
std::uint32_t BuildId
Definition: types.h:60
void processEventMessage(NEMId nemId, EventId eventId, const Serialization &serialization, NEMId ignoreNEM={}) const