34 from .
import event_pb2
35 from ..ota
import otaheader_pb2
36 from .
import EventServiceException
49 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
50 return socket.inet_ntoa(fcntl.ioctl(
53 struct.pack(
'256s', ifname[:15].encode()
if sys.version_info >= (3,0)
else ifname[:15])
59 sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
61 except socket.error
as msg :
62 if sys.version_info >= (3,3):
63 raise EventServiceException(
"event socket failure %s" % str(msg),
True)
65 raise EventServiceException(
"event socket failure %s %s" % (str(msg[0]), msg[1]),
True)
68 sock.setsockopt(socket.IPPROTO_IP,socket.IP_MULTICAST_TTL,32)
69 except socket.error
as msg :
70 if sys.version_info >= (3,3):
71 raise EventServiceException(
"event socket option failure %s" % str(msg),
True)
73 raise EventServiceException(
"event socket option failure %s %s" % (str(msg[0]), msg[1]),
True)
76 sock.setsockopt(socket.IPPROTO_IP,socket.IP_MULTICAST_LOOP,1)
77 except socket.error
as msg :
78 if sys.version_info >= (3,3):
79 raise EventServiceException(
"event socket option failure %s" % str(msg),
True)
81 raise EventServiceException(
"event socket option failure %s %s" % (str(msg[0]), msg[1]),
True)
84 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
85 except socket.error
as msg :
86 if sys.version_info >= (3,3):
87 raise EventServiceException(
"event socket option failure %s" % str(msg),
True)
89 raise EventServiceException(
"event socket option failure %s %s" % (str(msg[0]), msg[1]),
True)
92 sock.bind((group,port))
93 except socket.error
as msg:
94 if sys.version_info >= (3,3):
95 raise EventServiceException(
"bind failure %s" % str(msg),
True)
97 raise EventServiceException(
"bind failure %s %s" % (str(msg[0]), msg[1]),
True)
103 devAddress = socket.inet_aton(
"0.0.0.0")
105 sock.setsockopt(socket.SOL_IP,
106 socket.IP_ADD_MEMBERSHIP,
107 socket.inet_aton(group) +
110 sock.setsockopt(socket.SOL_IP,
111 socket.IP_MULTICAST_IF,
115 except socket.error
as msg:
116 if sys.version_info >= (3,3):
117 raise EventServiceException(
"mulicast add membership failure %s" % str(msg),
True)
119 raise EventServiceException(
"mulicast add membership failure %s %s" % (str(msg[0]), msg[1]),
True)
122 raise EventServiceException(
"unknown device %s" % device,
True)
129 (self._multicastGroup,self._port,_) = eventchannel
133 self._readFd,self.
_writeFd = os.pipe()
134 self.
_uuid = uuid.uuid4()
143 self.
_lock = threading.Lock()
146 os.write(self.
_writeFd,
"\n".encode())
153 rdfds = [self.
_socket,self._readFd]
159 readable,_,_ = select.select(rdfds,[],[])
165 data,_ = self.
_socket.recvfrom(65535)
171 (length,) = struct.unpack_from(
"!H",data)
173 if length == len(data) - 2:
174 event = event_pb2.Event()
176 event.ParseFromString(data[2:])
178 for serialization
in event.data.serializations:
183 if serialization.eventId
in self.
_handlers:
184 self.
_handlers[serialization.eventId](serialization.nemId,
185 serialization.eventId,
187 uuid.UUID(bytes=event.uuid),
188 event.sequenceNumber)
190 default(serialization.nemId,
191 serialization.eventId,
193 uuid.UUID(bytes=event.uuid),
194 event.sequenceNumber)
198 elif fd
is self._readFd:
209 (headerLength,) = struct.unpack_from(
"!H",data)
211 otaHeader = otaheader_pb2.OTAHeader()
213 otaHeader.ParseFromString(data[2:headerLength+2])
215 eventData = event_pb2.Event.Data()
228 if otaHeader.HasField(
"payloadInfo")
and \
229 len(data) >= 2 + headerLength + 9 + otaHeader.payloadInfo.eventLength:
231 eventData.ParseFromString(data[2+headerLength + 9:2 + headerLength + 9 + otaHeader.payloadInfo.eventLength])
233 for serialization
in eventData.serializations:
238 if serialization.eventId
in self.
_handlers:
239 self.
_handlers[serialization.eventId](serialization.nemId,
240 serialization.eventId,
242 uuid.UUID(bytes=otaHeader.uuid),
245 default(serialization.nemId,
246 serialization.eventId,
248 uuid.UUID(bytes=otaHeader.uuid),
261 rdfds = [self.
_socket,self._readFd]
266 readable,_,_ = select.select(rdfds,[],[])
272 data,_ = self.
_socket.recvfrom(65535)
278 (length,) = struct.unpack_from(
"!H",data)
280 if length == len(data) - 2:
281 event = event_pb2.Event()
283 event.ParseFromString(data[2:])
285 for serialization
in event.data.serializations:
286 events.append((serialization.nemId,
287 serialization.eventId,
290 return (uuid.UUID(bytes=event.uuid),
291 event.sequenceNumber,
294 elif fd
is self._readFd:
305 (headerLength,) = struct.unpack_from(
"!H",data)
307 otaHeader = otaheader_pb2.OTAHeader()
309 otaHeader.ParseFromString(data[2:headerLength+2])
311 eventData = event_pb2.Event.Data()
313 eventData.ParseFromString(data[2+headerLength:2 + headerLength +otaHeader.eventLength])
315 for serialization
in eventData.serializations:
316 events.append((serialization.nemId,
317 serialization.eventId,
320 return (uuid.UUID(bytes=otaHeader.uuid),
321 otaHeader.sequenceNumber,
324 return (
None,
None, tuple(events))
347 msg = event_pb2.Event()
348 msg.uuid = self.
_uuid.bytes
350 serialization = msg.data.serializations.add()
351 serialization.nemId = nemId
352 serialization.eventId = event.IDENTIFIER
353 serialization.data = event.serialize()
355 buf = msg.SerializeToString()
357 self.
_socket.sendto(struct.pack(
"!H",len(buf)) + buf,
358 (self._multicastGroup,self._port))
def __init__(self, eventchannel, otachannel=None)
def init_multicast_socket(group, port, device)
def get_ip_address(ifname)
def loop(self, default=None)
def publish(self, nemId, event)
def subscribe(self, eventId, callback)
def unsubscribe(self, eventId)