1212from common_utility import IReusableTimer
1313from context_logger import get_logger
1414
15- from hello import Group , ServiceQuery , Sender , Receiver , ServiceInfo , ServiceMatcher , AbstractScheduler
15+ from hello import Group , ServiceQuery , Sender , Receiver , Service , ServiceMatcher , AbstractScheduler
1616
1717log = get_logger ('Discoverer' )
1818
1919
2020class DiscoveryEventType (Enum ):
2121 DISCOVERED = 'discovered'
22+ UNCHANGED = 'unchanged'
2223 UPDATED = 'updated'
2324
2425
2526@dataclass
2627class DiscoveryEvent :
2728 group : Group
2829 query : ServiceQuery
29- service : ServiceInfo
30+ service : Service
3031 type : DiscoveryEventType
3132
3233
@@ -45,13 +46,13 @@ def stop(self) -> None:
4546 def discover (self , query : ServiceQuery | None = None , log_level : int = INFO ) -> None :
4647 raise NotImplementedError ()
4748
48- def register (self , handler : OnDiscoveryEvent ) -> None :
49+ def register (self , handler : OnDiscoveryEvent , types : set [ DiscoveryEventType ] | None = None ) -> None :
4950 raise NotImplementedError ()
5051
51- def deregister (self , handler : OnDiscoveryEvent ) -> None :
52+ def deregister (self , handler : OnDiscoveryEvent , types : set [ DiscoveryEventType ] | None = None ) -> None :
5253 raise NotImplementedError ()
5354
54- def get_services (self ) -> dict [UUID , ServiceInfo ]:
55+ def get_services (self ) -> dict [UUID , Service ]:
5556 raise NotImplementedError ()
5657
5758
@@ -62,8 +63,10 @@ def __init__(self, sender: Sender, receiver: Receiver, max_workers: int = 8) ->
6263 self ._receiver = receiver
6364 self ._group : Group | None = None
6465 self ._matcher : ServiceMatcher | None = None
65- self ._services : dict [UUID , ServiceInfo ] = {}
66- self ._handlers : list [OnDiscoveryEvent ] = []
66+ self ._services : dict [UUID , Service ] = {}
67+ self ._handlers : dict [DiscoveryEventType , list [OnDiscoveryEvent ]] = {
68+ event_type : [] for event_type in DiscoveryEventType
69+ }
6770 self ._handler_executor = ThreadPoolExecutor (max_workers = max_workers )
6871
6972 def __enter__ (self ) -> Discoverer :
@@ -101,48 +104,53 @@ def discover(self, query: ServiceQuery | None = None, log_level: int = INFO) ->
101104 else :
102105 log .warning ('Cannot discover services, discoverer not started' , query = query )
103106
104- def register (self , handler : OnDiscoveryEvent ) -> None :
105- self ._handlers .append (handler )
107+ def register (self , handler : OnDiscoveryEvent , types : set [DiscoveryEventType ] | None = None ) -> None :
108+ for event_type in types if types else self ._get_event_types ():
109+ self ._handlers [event_type ].append (handler )
106110
107- def deregister (self , handler : OnDiscoveryEvent ) -> None :
108- self ._handlers .remove (handler )
111+ def deregister (self , handler : OnDiscoveryEvent , types : set [DiscoveryEventType ] | None = None ) -> None :
112+ for event_type in types if types else self ._get_event_types ():
113+ self ._handlers [event_type ].remove (handler )
109114
110- def get_services (self ) -> dict [UUID , ServiceInfo ]:
115+ def get_services (self ) -> dict [UUID , Service ]:
111116 return self ._services .copy ()
112117
118+ def _get_event_types (self ) -> set [DiscoveryEventType ]:
119+ return set (self ._handlers .keys ())
120+
113121 def _handle_message (self , message : dict [str , Any ]) -> None :
114122 if self ._group and self ._matcher :
115123 try :
116- service = ServiceInfo (UUID (message ['uuid' ]), message ['name' ], message ['role' ], message .get ('urls' , {}))
117- log .debug ('Service info received' , service = service , group = self ._group )
124+ service = Service (UUID (message ['uuid' ]), message ['name' ], message ['role' ],
125+ message .get ('urls' , {}), message .get ('info' , {}), message ['address' ])
126+ log .debug ('Service received' , service = service , group = self ._group )
118127 self ._handle_service (service , self ._group , self ._matcher )
119128 except Exception as error :
120- log .warn ('Invalid service info received' , group = self ._group , data = message , error = error )
129+ log .warn ('Invalid service received' , group = self ._group , data = message , error = error )
121130
122- def _handle_service (self , service : ServiceInfo , group : Group , matcher : ServiceMatcher ) -> None :
131+ def _handle_service (self , service : Service , group : Group , matcher : ServiceMatcher ) -> None :
123132 if matcher .matches (service ):
124133 stored = self ._services .get (service .uuid )
134+ event = self ._create_event (group , matcher , stored , service )
135+ self ._handle_event (event )
125136
126- if event := self ._create_event (group , matcher , stored , service ):
127- self ._handle_event (event )
128-
129- def _create_event (self , group : Group , matcher : ServiceMatcher ,
130- stored : ServiceInfo | None , service : ServiceInfo ) -> DiscoveryEvent | None :
137+ def _create_event (self , group : Group , matcher : ServiceMatcher , stored : Service | None ,
138+ service : Service ) -> DiscoveryEvent :
131139 if stored :
132140 if stored != service :
133141 log .info ('Service updated' , group = group , old_service = stored , new_service = service )
134142 return DiscoveryEvent (group , matcher .query , service , DiscoveryEventType .UPDATED )
135143 else :
136144 log .debug ('Service unchanged' , group = group , service = service )
137- return None
145+ return DiscoveryEvent ( group , matcher . query , service , DiscoveryEventType . UPDATED )
138146 else :
139- log .info ('New service discovered' , group = group , service = service )
147+ log .info ('Service discovered' , group = group , service = service )
140148 return DiscoveryEvent (group , matcher .query , service , DiscoveryEventType .DISCOVERED )
141149
142150 def _handle_event (self , event : DiscoveryEvent ) -> None :
143151 self ._services [event .service .uuid ] = event .service
144152
145- for handler in self ._handlers :
153+ for handler in self ._handlers [ event . type ] :
146154 self ._handler_executor .submit (self ._execute_handler , handler , event )
147155
148156 def _execute_handler (self , handler : OnDiscoveryEvent , event : DiscoveryEvent ) -> None :
@@ -174,14 +182,14 @@ def stop(self) -> None:
174182 def discover (self , query : ServiceQuery | None = None , log_level : int = INFO ) -> None :
175183 self ._discoverer .discover (query , log_level )
176184
177- def get_services (self ) -> dict [UUID , ServiceInfo ]:
185+ def get_services (self ) -> dict [UUID , Service ]:
178186 return self ._discoverer .get_services ()
179187
180- def register (self , handler : OnDiscoveryEvent ) -> None :
181- self ._discoverer .register (handler )
188+ def register (self , handler : OnDiscoveryEvent , types : set [ DiscoveryEventType ] | None = None ) -> None :
189+ self ._discoverer .register (handler , types )
182190
183- def deregister (self , handler : OnDiscoveryEvent ) -> None :
184- self ._discoverer .deregister (handler )
191+ def deregister (self , handler : OnDiscoveryEvent , types : set [ DiscoveryEventType ] | None = None ) -> None :
192+ self ._discoverer .deregister (handler , types )
185193
186194 def _execute (self , query : ServiceQuery | None = None ) -> None :
187195 self .discover (query , DEBUG )
0 commit comments