34 from . 
import event_pb2
    35 from ..ota 
import otaheader_pb2
    36 from . 
import EventServiceException
    49     s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    50     return socket.inet_ntoa(fcntl.ioctl(
    53         struct.pack(
'256s', ifname[:15].encode() 
if sys.version_info >= (3,0) 
else ifname[:15])
    59         sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
    61     except socket.error 
as msg :
    62         if sys.version_info >= (3,3):
    63             raise EventServiceException(
"event socket failure %s" % str(msg),
True)
    65             raise EventServiceException(
"event socket failure %s %s" % (str(msg[0]), msg[1]),
True)
    68         sock.setsockopt(socket.IPPROTO_IP,socket.IP_MULTICAST_TTL,32)
    69     except socket.error 
as msg :
    70         if sys.version_info >= (3,3):
    71             raise EventServiceException(
"event socket option failure %s" % str(msg),
True)
    73             raise EventServiceException(
"event socket option failure %s %s" % (str(msg[0]), msg[1]),
True)
    76         sock.setsockopt(socket.IPPROTO_IP,socket.IP_MULTICAST_LOOP,1)
    77     except socket.error 
as msg :
    78         if sys.version_info >= (3,3):
    79             raise EventServiceException(
"event socket option failure %s" % str(msg),
True)
    81             raise EventServiceException(
"event socket option failure %s %s" % (str(msg[0]), msg[1]),
True)
    84         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    85     except socket.error 
as msg :
    86         if sys.version_info >= (3,3):
    87             raise EventServiceException(
"event socket option failure %s" % str(msg),
True)
    89             raise EventServiceException(
"event socket option failure %s %s" % (str(msg[0]), msg[1]),
True)
    92         sock.bind((group,port))
    93     except socket.error 
as msg:
    94         if sys.version_info >= (3,3):
    95             raise EventServiceException(
"bind failure %s" % str(msg),
True)
    97             raise EventServiceException(
"bind failure %s %s" % (str(msg[0]), msg[1]),
True)
   103             devAddress = socket.inet_aton(
"0.0.0.0")
   105         sock.setsockopt(socket.SOL_IP,
   106                         socket.IP_ADD_MEMBERSHIP,
   107                         socket.inet_aton(group) +
   110         sock.setsockopt(socket.SOL_IP,
   111                         socket.IP_MULTICAST_IF,
   115     except socket.error 
as msg:
   116         if sys.version_info >= (3,3):
   117             raise EventServiceException(
"mulicast add membership failure %s" % str(msg),
True)
   119             raise EventServiceException(
"mulicast add membership failure %s %s" % (str(msg[0]), msg[1]),
True)
   122         raise  EventServiceException(
"unknown device %s" % device,
True)
   129         (self._multicastGroup,self._port,_) = eventchannel
   133         self._readFd,self.
_writeFd = os.pipe()
   134         self.
_uuid = uuid.uuid4()
   143         self.
_lock = threading.Lock()
   146         os.write(self.
_writeFd,
"\n".encode())
   153             rdfds = [self.
_socket,self._readFd]
   159                 readable,_,_ = select.select(rdfds,[],[])
   165                     data,_ = self.
_socket.recvfrom(65535)
   171                     (length,) = struct.unpack_from(
"!H",data)
   173                     if length == len(data) - 2:
   174                         event = event_pb2.Event()
   176                         event.ParseFromString(data[2:])
   178                         for serialization 
in event.data.serializations:
   183                                 if serialization.eventId 
in self.
_handlers:
   184                                     self.
_handlers[serialization.eventId](serialization.nemId,
   185                                                                           serialization.eventId,
   187                                                                           uuid.UUID(bytes=event.uuid),
   188                                                                           event.sequenceNumber)
   190                                     default(serialization.nemId,
   191                                             serialization.eventId,
   193                                             uuid.UUID(bytes=event.uuid),
   194                                             event.sequenceNumber)
   198                 elif fd 
is self._readFd:
   209                     (headerLength,) = struct.unpack_from(
"!H",data)
   211                     otaHeader = otaheader_pb2.OTAHeader()
   213                     otaHeader.ParseFromString(data[2:headerLength+2])
   215                     eventData = event_pb2.Event.Data()
   228                     if otaHeader.HasField(
"payloadInfo") 
and \
   229                        len(data) >= 2 + headerLength + 9 + otaHeader.payloadInfo.eventLength:
   231                         eventData.ParseFromString(data[2+headerLength + 9:2 + headerLength + 9 + otaHeader.payloadInfo.eventLength])
   233                         for serialization 
in eventData.serializations:
   238                                 if serialization.eventId 
in self.
_handlers:
   239                                     self.
_handlers[serialization.eventId](serialization.nemId,
   240                                                                           serialization.eventId,
   242                                                                           uuid.UUID(bytes=otaHeader.uuid),
   245                                     default(serialization.nemId,
   246                                             serialization.eventId,
   248                                             uuid.UUID(bytes=otaHeader.uuid),
   261                 rdfds = [self.
_socket,self._readFd]
   266                 readable,_,_ = select.select(rdfds,[],[])
   272                     data,_ = self.
_socket.recvfrom(65535)
   278                     (length,) = struct.unpack_from(
"!H",data)
   280                     if length == len(data) - 2:
   281                         event = event_pb2.Event()
   283                         event.ParseFromString(data[2:])
   285                         for serialization 
in event.data.serializations:
   286                             events.append((serialization.nemId,
   287                                            serialization.eventId,
   290                         return (uuid.UUID(bytes=event.uuid),
   291                                 event.sequenceNumber,
   294                 elif fd 
is self._readFd:
   305                     (headerLength,) = struct.unpack_from(
"!H",data)
   307                     otaHeader = otaheader_pb2.OTAHeader()
   309                     otaHeader.ParseFromString(data[2:headerLength+2])
   311                     eventData = event_pb2.Event.Data()
   313                     eventData.ParseFromString(data[2+headerLength:2 + headerLength +otaHeader.eventLength])
   315                     for serialization 
in eventData.serializations:
   316                         events.append((serialization.nemId,
   317                                        serialization.eventId,
   320                     return (uuid.UUID(bytes=otaHeader.uuid),
   321                             otaHeader.sequenceNumber,
   324         return (
None, 
None, tuple(events))
   347         msg = event_pb2.Event()
   348         msg.uuid = self.
_uuid.bytes
   350         serialization = msg.data.serializations.add()
   351         serialization.nemId = nemId
   352         serialization.eventId = event.IDENTIFIER
   353         serialization.data = event.serialize()
   355         buf = msg.SerializeToString()
   357         self.
_socket.sendto(struct.pack(
"!H",len(buf)) + buf,
   358                             (self._multicastGroup,self._port))
 def __init__(self, eventchannel, otachannel=None)
def init_multicast_socket(group, port, device)
def get_ip_address(ifname)
def loop(self, default=None)
def publish(self, nemId, event)
def subscribe(self, eventId, callback)
def unsubscribe(self, eventId)