EMANE  1.2.1
emane/events/eventservice.py
Go to the documentation of this file.
1 #
2 # Copyright (c) 2013-2015,2017 - Adjacent Link LLC, Bridgewater,
3 # New Jersey
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above copyright
13 # notice, this list of conditions and the following disclaimer in
14 # the documentation and/or other materials provided with the
15 # distribution.
16 # * Neither the name of Adjacent Link LLC nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 #
33 
34 from . import event_pb2
35 from ..ota import otaheader_pb2
36 from . import EventServiceException
37 import os
38 import socket
39 import threading
40 import fcntl
41 import struct
42 import select
43 import time
44 import uuid
45 import sys
46 
47 def get_ip_address(ifname):
48  # http://code.activestate.com/recipes/439094/
49  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
50  return socket.inet_ntoa(fcntl.ioctl(
51  s.fileno(),
52  0x8915, # SIOCGIFADDR
53  struct.pack('256s', ifname[:15].encode() if sys.version_info >= (3,0) else ifname[:15])
54  )[20:24])
55 
56 
57 def init_multicast_socket(group,port,device):
58  try:
59  sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
60 
61  except socket.error as msg :
62  if sys.version_info >= (3,3):
63  raise EventServiceException("event socket failure %s" % str(msg),True)
64  else:
65  raise EventServiceException("event socket failure %s %s" % (str(msg[0]), msg[1]),True)
66 
67  try:
68  sock.setsockopt(socket.IPPROTO_IP,socket.IP_MULTICAST_TTL,32)
69  except socket.error as msg :
70  if sys.version_info >= (3,3):
71  raise EventServiceException("event socket option failure %s" % str(msg),True)
72  else:
73  raise EventServiceException("event socket option failure %s %s" % (str(msg[0]), msg[1]),True)
74 
75  try:
76  sock.setsockopt(socket.IPPROTO_IP,socket.IP_MULTICAST_LOOP,1)
77  except socket.error as msg :
78  if sys.version_info >= (3,3):
79  raise EventServiceException("event socket option failure %s" % str(msg),True)
80  else:
81  raise EventServiceException("event socket option failure %s %s" % (str(msg[0]), msg[1]),True)
82 
83  try:
84  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
85  except socket.error as msg :
86  if sys.version_info >= (3,3):
87  raise EventServiceException("event socket option failure %s" % str(msg),True)
88  else:
89  raise EventServiceException("event socket option failure %s %s" % (str(msg[0]), msg[1]),True)
90 
91  try:
92  sock.bind((group,port))
93  except socket.error as msg:
94  if sys.version_info >= (3,3):
95  raise EventServiceException("bind failure %s" % str(msg),True)
96  else:
97  raise EventServiceException("bind failure %s %s" % (str(msg[0]), msg[1]),True)
98 
99  try:
100  if device:
101  devAddress = socket.inet_aton(get_ip_address(device))
102  else:
103  devAddress = socket.inet_aton("0.0.0.0")
104 
105  sock.setsockopt(socket.SOL_IP,
106  socket.IP_ADD_MEMBERSHIP,
107  socket.inet_aton(group) +
108  devAddress)
109 
110  sock.setsockopt(socket.SOL_IP,
111  socket.IP_MULTICAST_IF,
112  devAddress)
113 
114 
115  except socket.error as msg:
116  if sys.version_info >= (3,3):
117  raise EventServiceException("mulicast add membership failure %s" % str(msg),True)
118  else:
119  raise EventServiceException("mulicast add membership failure %s %s" % (str(msg[0]), msg[1]),True)
120 
121  except IOError:
122  raise EventServiceException("unknown device %s" % device,True)
123 
124  return sock
125 
126 
127 class EventService:
128  def __init__(self,eventchannel,otachannel = None):
129  (self._multicastGroup,self._port,_) = eventchannel
130  self._defaultHandler = None
131  self._handlers = {}
132  self._socket = None
133  self._readFd,self._writeFd = os.pipe()
134  self._uuid = uuid.uuid4()
135  self._sequenceNumber = 0
136  self._socketOTA = None
137 
138  self._socket = init_multicast_socket(*eventchannel)
139 
140  if otachannel:
141  self._socketOTA = init_multicast_socket(*otachannel)
142 
143  self._lock = threading.Lock()
144 
145  def breakloop(self):
146  os.write(self._writeFd,"\n".encode())
147 
148  def loop(self,default=None):
149  buffer = ""
150  running = True
151 
152  while running:
153  rdfds = [self._socket,self._readFd]
154 
155  if self._socketOTA:
156  rdfds.append(self._socketOTA)
157 
158  try:
159  readable,_,_ = select.select(rdfds,[],[])
160  except select.error:
161  continue
162 
163  for fd in readable:
164  if fd is self._socket:
165  data,_ = self._socket.recvfrom(65535)
166 
167  if not len(data):
168  running = False
169  break
170 
171  (length,) = struct.unpack_from("!H",data)
172 
173  if length == len(data) - 2:
174  event = event_pb2.Event()
175 
176  event.ParseFromString(data[2:])
177 
178  for serialization in event.data.serializations:
179  self._lock.acquire()
180 
181  try:
182 
183  if serialization.eventId in self._handlers:
184  self._handlers[serialization.eventId](serialization.nemId,
185  serialization.eventId,
186  serialization.data,
187  uuid.UUID(bytes=event.uuid),
188  event.sequenceNumber)
189  elif default:
190  default(serialization.nemId,
191  serialization.eventId,
192  serialization.data,
193  uuid.UUID(bytes=event.uuid),
194  event.sequenceNumber)
195  finally:
196  self._lock.release()
197 
198  elif fd is self._readFd:
199  running = False
200  break
201 
202  elif fd is self._socketOTA:
203  data,_ = self._socketOTA.recvfrom(65535)
204 
205  if not len(data):
206  running = False
207  break
208 
209  (headerLength,) = struct.unpack_from("!H",data)
210 
211  otaHeader = otaheader_pb2.OTAHeader()
212 
213  otaHeader.ParseFromString(data[2:headerLength+2])
214 
215  eventData = event_pb2.Event.Data()
216 
217  # currently we only process attached events that
218  # are fully contained in the first part (fragment)
219  # of a one-part (no fragmentation) or multi-part
220  # (fragmented) OTA message
221  #
222  # Notes for below logic:
223  # 2 + headerLength = 2 byte header length field
224  # + header length
225  #
226  # 9 = OTA PartInfo header length. Where PartInfo
227  # is used to support fragmentation.
228  if otaHeader.HasField("payloadInfo") and \
229  len(data) >= 2 + headerLength + 9 + otaHeader.payloadInfo.eventLength:
230 
231  eventData.ParseFromString(data[2+headerLength + 9:2 + headerLength + 9 + otaHeader.payloadInfo.eventLength])
232 
233  for serialization in eventData.serializations:
234  self._lock.acquire()
235 
236  try:
237 
238  if serialization.eventId in self._handlers:
239  self._handlers[serialization.eventId](serialization.nemId,
240  serialization.eventId,
241  serialization.data,
242  uuid.UUID(bytes=otaHeader.uuid),
243  otaHeader.sequence)
244  elif default:
245  default(serialization.nemId,
246  serialization.eventId,
247  serialization.data,
248  uuid.UUID(bytes=otaHeader.uuid),
249  otaHeader.sequence)
250  finally:
251  self._lock.release()
252 
253 
254  def nextEvent(self):
255  events = []
256  eventId = 0
257  running = True
258 
259  while running:
260  try:
261  rdfds = [self._socket,self._readFd]
262 
263  if self._socketOTA:
264  rdfds.append(self._socketOTA)
265 
266  readable,_,_ = select.select(rdfds,[],[])
267  except select.error:
268  continue
269 
270  for fd in readable:
271  if fd is self._socket:
272  data,_ = self._socket.recvfrom(65535)
273 
274  if not len(data):
275  running = False
276  break
277 
278  (length,) = struct.unpack_from("!H",data)
279 
280  if length == len(data) - 2:
281  event = event_pb2.Event()
282 
283  event.ParseFromString(data[2:])
284 
285  for serialization in event.data.serializations:
286  events.append((serialization.nemId,
287  serialization.eventId,
288  serialization.data))
289 
290  return (uuid.UUID(bytes=event.uuid),
291  event.sequenceNumber,
292  tuple(events))
293 
294  elif fd is self._readFd:
295  running = False
296  break
297 
298  elif fd is self._socketOTA:
299  data,_ = self._socketOTA.recvfrom(65535)
300 
301  if not len(data):
302  running = False
303  break
304 
305  (headerLength,) = struct.unpack_from("!H",data)
306 
307  otaHeader = otaheader_pb2.OTAHeader()
308 
309  otaHeader.ParseFromString(data[2:headerLength+2])
310 
311  eventData = event_pb2.Event.Data()
312 
313  eventData.ParseFromString(data[2+headerLength:2 + headerLength +otaHeader.eventLength])
314 
315  for serialization in eventData.serializations:
316  events.append((serialization.nemId,
317  serialization.eventId,
318  serialization.data))
319 
320  return (uuid.UUID(bytes=otaHeader.uuid),
321  otaHeader.sequenceNumber,
322  tuple(events))
323 
324  return (None, None, tuple(events))
325 
326 
327  def subscribe(self,eventId,callback):
328  self._lock.acquire()
329 
330  if callback:
331  self._handlers[eventId] = callback
332 
333  self._lock.release()
334 
335 
336  def unsubscribe(self,eventId):
337  self._lock.acquire()
338 
339  if eventId in self._handlers:
340  del self._handlers[eventId]
341 
342  self._lock.release()
343 
344 
345  def publish(self,nemId,event):
346  self._sequenceNumber += 1
347  msg = event_pb2.Event()
348  msg.uuid = self._uuid.bytes
349  msg.sequenceNumber = self._sequenceNumber
350  serialization = msg.data.serializations.add()
351  serialization.nemId = nemId
352  serialization.eventId = event.IDENTIFIER
353  serialization.data = event.serialize()
354 
355  buf = msg.SerializeToString()
356 
357  self._socket.sendto(struct.pack("!H",len(buf)) + buf,
358  (self._multicastGroup,self._port))
def __init__(self, eventchannel, otachannel=None)