@@ -33,11 +33,12 @@ def get_handlers(self) -> list[OnMessage]:
3333
3434class DishReceiver (Receiver ):
3535
36- def __init__ (self , context : Context [Any ], max_workers : int = 1 , poll_timeout : float = 0.1 ) -> None :
36+ def __init__ (self , context : Context [Any ], max_workers : int = 8 , poll_timeout : float = 0.1 ) -> None :
3737 self ._context = context
3838 self ._dish = self ._context .socket (DISH )
3939 self ._poller = Poller ()
40- self ._executor = ThreadPoolExecutor (max_workers = max_workers )
40+ self ._loop_executor = ThreadPoolExecutor (max_workers = 1 )
41+ self ._handler_executor = ThreadPoolExecutor (max_workers = max_workers )
4142 self ._poll_timeout = int (poll_timeout * 1000 )
4243 self ._group : str | None = None
4344 self ._handlers : list [OnMessage ] = []
@@ -56,7 +57,7 @@ def start(self, group: PrefixedGroup) -> None:
5657 self ._dish .bind (group .url )
5758 self ._dish .join (group .name )
5859 self ._group = group .name
59- self ._executor .submit (self ._receive_loop )
60+ self ._loop_executor .submit (self ._receive_loop )
6061 log .debug ('Receiver started' , url = group .url , group = group .name )
6162 except Exception as error :
6263 log .error ('Failed to start receiver' , url = group .url , group = group .name , error = error )
@@ -65,7 +66,7 @@ def start(self, group: PrefixedGroup) -> None:
6566 def stop (self ) -> None :
6667 try :
6768 self ._group = None
68- self ._executor .shutdown ()
69+ self ._loop_executor .shutdown ()
6970 self ._dish .close ()
7071 log .debug ('Receiver stopped' )
7172 except Exception as error :
@@ -83,18 +84,21 @@ def get_handlers(self) -> list[OnMessage]:
8384
8485 def _receive_loop (self ) -> None :
8586 while self ._group :
86- sockets = dict (self ._poller .poll (timeout = self ._poll_timeout ))
87- if self ._dish in sockets and sockets [self ._dish ] == POLLIN :
88- try :
89- data = self ._dish .recv_json ()
90- log .debug ('Message received' , data = data , group = self ._group )
91- self ._handle_message (data )
92- except Exception as error :
93- log .error ('Failed to receive message' , group = self ._group , error = error )
87+ try :
88+ sockets = dict (self ._poller .poll (timeout = self ._poll_timeout ))
89+ if self ._dish in sockets and sockets [self ._dish ] == POLLIN :
90+ message = self ._dish .recv_json ()
91+ self ._handle_message (message )
92+ except Exception as error :
93+ log .error ('Failed to receive message' , group = self ._group , error = error )
9494
9595 def _handle_message (self , message : dict [str , Any ]) -> None :
96+ log .debug ('Message received' , data = message , group = self ._group )
9697 for handler in self ._handlers :
97- try :
98- handler (message )
99- except Exception as error :
100- log .warn ('Error in message handler execution' , data = message , group = self ._group , error = error )
98+ self ._handler_executor .submit (self ._execute_handler , handler , message )
99+
100+ def _execute_handler (self , handler : OnMessage , message : dict [str , Any ]) -> None :
101+ try :
102+ handler (message )
103+ except Exception as error :
104+ log .warn ('Error in message handler execution' , data = message , group = self ._group , error = error )
0 commit comments