Skip to content

Commit e7dbc2c

Browse files
Initial commit WIP
1 parent da6db0a commit e7dbc2c

10 files changed

Lines changed: 258 additions & 72 deletions

hello/advertizer.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import random
2+
import time
13
from typing import Any
24

35
from common_utility import IReusableTimer
@@ -47,46 +49,49 @@ def advertise(self, info: ServiceInfo | None = None) -> None:
4749
if info:
4850
self._info = info
4951
if self._info:
50-
self._sender.send(info)
52+
self._sender.send(self._info)
5153
log.info('Service advertised', service=self._info, group=self._group)
5254
else:
5355
log.warning('Cannot advertise service, advertizer not started', service=info)
5456

5557

5658
class RespondingAdvertizer(DefaultAdvertizer):
5759

58-
def __init__(self, sender: Sender, receiver: Receiver) -> None:
60+
def __init__(self, sender: Sender, receiver: Receiver, max_response_delay: float = 0.1) -> None:
5961
super().__init__(sender)
6062
self._receiver = receiver
63+
self._max_delay = max_response_delay
6164

6265
def start(self, address: str, group: Group, info: ServiceInfo | None = None) -> None:
6366
super().start(address, group, info)
6467
self._receiver.start(GroupAccess(address, group.query()))
65-
self._receiver.register(self._handle_query)
68+
self._receiver.register(self._handle_message)
6669

6770
def stop(self) -> None:
6871
super().stop()
6972
self._receiver.stop()
7073

71-
def _handle_query(self, data: dict[str, str]) -> None:
74+
def _handle_message(self, message: dict[str, Any]) -> None:
7275
if self._info:
73-
matcher: ServiceMatcher | None = None
74-
7576
try:
76-
query = ServiceQuery(**data)
77-
matcher = ServiceMatcher(query)
77+
query = ServiceQuery(**message)
7878
log.debug('Query received', group=self._group, query=query)
79+
self._handle_query(query, self._info)
7980
except Exception as error:
80-
log.warning('Invalid query message received', group=self._group, received=data, error=error)
81+
log.warning('Invalid query message received', group=self._group, received=message, error=error)
8182

82-
if matcher and matcher.matches(self._info):
83-
log.info('Query matches service', group=self._group, query=matcher.query, service=self._info)
84-
self.advertise(self._info)
83+
def _handle_query(self, query: ServiceQuery, info: ServiceInfo) -> None:
84+
matcher = ServiceMatcher(query)
85+
if matcher and matcher.matches(info):
86+
delay = round(self._max_delay * random.random(), 3)
87+
log.info('Responding to query', group=self._group, query=matcher.query, service=info, delay=delay)
88+
time.sleep(delay)
89+
self.advertise(info)
8590

8691

8792
class ScheduledAdvertizer(Advertizer):
8893

89-
def schedule(self, info: ServiceInfo, interval: float, one_shot: bool = False) -> None:
94+
def schedule(self, info: ServiceInfo | None = None, interval: float = 10, one_shot: bool = False) -> None:
9095
raise NotImplementedError()
9196

9297

@@ -112,7 +117,7 @@ def stop(self) -> None:
112117
def advertise(self, info: ServiceInfo | None = None) -> None:
113118
self._advertizer.advertise(info)
114119

115-
def schedule(self, info: ServiceInfo, interval: float, one_shot: bool = False) -> None:
120+
def schedule(self, info: ServiceInfo | None = None, interval: float = 10, one_shot: bool = False) -> None:
116121
if one_shot:
117122
self._timer.start(interval, self.advertise, [info])
118123
log.info('One-shot service advertisement scheduled', service=info, interval=interval)

hello/api.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Any
22

3-
from common_utility import ReusableTimer, IReusableTimer
3+
from common_utility import ReusableTimer
44
from zmq import Context
55

66
from hello import Advertizer, Discoverer, RadioSender, DishReceiver, DefaultAdvertizer, DefaultDiscoverer, \
@@ -9,10 +9,10 @@
99

1010
class Hello:
1111

12-
def default_advertizer(self, respond: bool = True) -> Advertizer:
12+
def default_advertizer(self, respond: bool = True, delay: float = 0.1) -> Advertizer:
1313
raise NotImplementedError()
1414

15-
def scheduled_advertizer(self, timer: IReusableTimer | None = None, respond: bool = True) -> ScheduledAdvertizer:
15+
def scheduled_advertizer(self, respond: bool = True, delay: float = 0.1) -> ScheduledAdvertizer:
1616
raise NotImplementedError()
1717

1818
def discoverer(self) -> Discoverer:
@@ -21,18 +21,24 @@ def discoverer(self) -> Discoverer:
2121

2222
class DefaultHello(Hello):
2323

24-
def __init__(self, context: Context[Any] | None = None) -> None:
24+
def __init__(self, context: Context[Any] | None = None, max_workers: int = 1, poll_timeout: float = 0.1) -> None:
2525
self._context = context if context else Context()
26-
self._sender = RadioSender(self._context)
27-
self._receiver = DishReceiver(self._context)
26+
self._max_workers = max_workers
27+
self._poll_timeout = poll_timeout
2828

29-
def default_advertizer(self, respond: bool = True) -> Advertizer:
30-
return RespondingAdvertizer(self._sender, self._receiver) if respond else DefaultAdvertizer(self._sender)
29+
def default_advertizer(self, respond: bool = True, delay: float = 0.1) -> Advertizer:
30+
sender = RadioSender(self._context)
31+
if respond:
32+
receiver = DishReceiver(self._context, self._max_workers, self._poll_timeout)
33+
return RespondingAdvertizer(sender, receiver, delay)
34+
else:
35+
return DefaultAdvertizer(sender)
3136

32-
def scheduled_advertizer(self, timer: IReusableTimer | None = None, respond: bool = True) -> ScheduledAdvertizer:
33-
advertizer = self.default_advertizer(respond)
34-
reusable_timer = timer if timer else ReusableTimer()
35-
return DefaultScheduledAdvertizer(advertizer, reusable_timer)
37+
def scheduled_advertizer(self, respond: bool = True, delay: float = 0.1) -> ScheduledAdvertizer:
38+
advertizer = self.default_advertizer(respond, delay)
39+
return DefaultScheduledAdvertizer(advertizer, ReusableTimer())
3640

3741
def discoverer(self) -> Discoverer:
38-
return DefaultDiscoverer(self._sender, self._receiver)
42+
sender = RadioSender(self._context)
43+
receiver = DishReceiver(self._context, self._max_workers, self._poll_timeout)
44+
return DefaultDiscoverer(sender, receiver)

hello/discoverer.py

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import Any, Callable
1+
from dataclasses import dataclass
2+
from enum import Enum
3+
from typing import Any, Protocol
24

35
from context_logger import get_logger
46

@@ -7,6 +9,21 @@
79
log = get_logger('Discoverer')
810

911

12+
class DiscoveryEventType(Enum):
13+
DISCOVERED = 'discovered'
14+
UPDATED = 'updated'
15+
16+
17+
@dataclass
18+
class DiscoveryEvent:
19+
service: ServiceInfo
20+
type: DiscoveryEventType
21+
22+
23+
class OnDiscoveryEvent(Protocol):
24+
def __call__(self, event: DiscoveryEvent) -> None: ...
25+
26+
1027
class Discoverer:
1128

1229
def start(self, address: str, group: Group, query: ServiceQuery | None = None) -> None:
@@ -21,10 +38,10 @@ def discover(self, query: ServiceQuery | None = None) -> None:
2138
def get_services(self) -> dict[str, ServiceInfo]:
2239
raise NotImplementedError()
2340

24-
def register(self, callback: Callable[[Any], None]) -> None:
41+
def register(self, callback: OnDiscoveryEvent) -> None:
2542
raise NotImplementedError()
2643

27-
def deregister(self, callback: Callable[[Any], None]) -> None:
44+
def deregister(self, callback: OnDiscoveryEvent) -> None:
2845
raise NotImplementedError()
2946

3047

@@ -36,7 +53,7 @@ def __init__(self, sender: Sender, receiver: Receiver) -> None:
3653
self._group: Group | None = None
3754
self._matcher: ServiceMatcher | None = None
3855
self._services: dict[str, ServiceInfo] = {}
39-
self._callbacks: list[Callable[[ServiceInfo], None]] = []
56+
self._callbacks: list[OnDiscoveryEvent] = []
4057

4158
def __enter__(self) -> Discoverer:
4259
return self
@@ -65,24 +82,24 @@ def discover(self, query: ServiceQuery | None = None) -> None:
6582
self._sender.send(self._matcher.query)
6683
log.info('Service discovery initiated', query=self._matcher.query, group=self._group)
6784
else:
68-
log.warning('Cannot initiate service discovery, discoverer not started', query=query)
85+
log.warning('Cannot discover services, discoverer not started', query=query)
6986

7087
def get_services(self) -> dict[str, ServiceInfo]:
7188
return self._services.copy()
7289

73-
def register(self, callback: Callable[[Any], None]) -> None:
90+
def register(self, callback: OnDiscoveryEvent) -> None:
7491
self._callbacks.append(callback)
7592

76-
def deregister(self, callback: Callable[[Any], None]) -> None:
93+
def deregister(self, callback: OnDiscoveryEvent) -> None:
7794
self._callbacks.remove(callback)
7895

79-
def _handle_message(self, data: dict[str, Any]) -> None:
96+
def _handle_message(self, message: dict[str, Any]) -> None:
8097
service: ServiceInfo | None = None
8198

8299
try:
83-
service = ServiceInfo(**data)
100+
service = ServiceInfo(**message)
84101
except Exception as error:
85-
log.warn('Failed to handle received message', data=data, error=error)
102+
log.warn('Failed to handle received message', data=message, error=error)
86103

87104
if service:
88105
self._handle_service(service)
@@ -91,29 +108,25 @@ def _handle_service(self, service: ServiceInfo) -> None:
91108
if self._matcher and self._matcher.matches(service):
92109
cached = self._services.get(service.name)
93110

94-
if self._is_update_needed(cached, service):
95-
self._services[service.name] = service
96-
for callback in self._callbacks:
97-
try:
98-
callback(service)
99-
except Exception as error:
100-
log.warn('Error in callback execution', service=service, error=error)
111+
if event := self._create_event(cached, service):
112+
self._handle_event(event)
101113

102-
def _is_update_needed(self, cached: ServiceInfo | None, service: ServiceInfo) -> bool:
114+
def _create_event(self, cached: ServiceInfo | None, service: ServiceInfo) -> DiscoveryEvent | None:
103115
if cached:
104116
if cached != service:
105117
log.info('Service updated', old_service=cached, new_service=service)
106-
return True
118+
return DiscoveryEvent(service, DiscoveryEventType.UPDATED)
107119
else:
108120
log.info('Service discovered', service=service)
109-
return True
121+
return DiscoveryEvent(service, DiscoveryEventType.DISCOVERED)
110122

111-
return False
123+
return None
112124

113-
def _handle_update(self, service: ServiceInfo) -> None:
125+
def _handle_event(self, event: DiscoveryEvent) -> None:
126+
service = event.service
114127
self._services[service.name] = service
115128
for callback in self._callbacks:
116129
try:
117-
callback(service)
130+
callback(event)
118131
except Exception as error:
119132
log.warn('Error in callback execution', service=service, error=error)

hello/receiver.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from concurrent.futures import ThreadPoolExecutor
2-
from typing import Any, Callable
2+
from typing import Any, Protocol
33

44
from context_logger import get_logger
55
from zmq import DISH, Poller, POLLIN, POLLOUT, Context
@@ -9,6 +9,10 @@
99
log = get_logger('Receiver')
1010

1111

12+
class OnMessage(Protocol):
13+
def __call__(self, message: dict[str, Any]) -> None: ...
14+
15+
1216
class Receiver:
1317

1418
def start(self, source: GroupAccess) -> None:
@@ -17,22 +21,23 @@ def start(self, source: GroupAccess) -> None:
1721
def stop(self) -> None:
1822
raise NotImplementedError()
1923

20-
def register(self, callback: Callable[[Any], None]) -> None:
24+
def register(self, callback: OnMessage) -> None:
2125
raise NotImplementedError()
2226

23-
def deregister(self, callback: Callable[[Any], None]) -> None:
27+
def deregister(self, callback: OnMessage) -> None:
2428
raise NotImplementedError()
2529

2630

2731
class DishReceiver(Receiver):
2832

29-
def __init__(self, context: Context[Any]) -> None:
33+
def __init__(self, context: Context[Any], max_workers: int = 1, poll_timeout: float = 0.1) -> None:
3034
self._context = context
3135
self._dish = self._context.socket(DISH)
3236
self._poller = Poller()
33-
self._executor = ThreadPoolExecutor(max_workers=1)
37+
self._executor = ThreadPoolExecutor(max_workers=max_workers)
38+
self._poll_timeout = int(poll_timeout * 1000)
3439
self._group: str | None = None
35-
self._callbacks: list[Callable[[Any], None]] = []
40+
self._callbacks: list[OnMessage] = []
3641

3742
def __enter__(self) -> Receiver:
3843
return self
@@ -42,12 +47,14 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
4247

4348
def start(self, source: GroupAccess) -> None:
4449
try:
50+
if self._group:
51+
raise RuntimeError('Receiver already started')
4552
self._poller.register(self._dish, POLLIN)
4653
self._dish.bind(source.access_url)
4754
self._dish.join(source.full_group)
4855
self._group = source.full_group
4956
self._executor.submit(self._handle_messages)
50-
log.info('Receiver started', address=source.access_url, group=source.full_group)
57+
log.debug('Receiver started', address=source.access_url, group=source.full_group)
5158
except Exception as error:
5259
log.error('Failed to start receiver', address=source.access_url, group=source.full_group, error=error)
5360
raise error
@@ -58,20 +65,20 @@ def stop(self) -> None:
5865
self._poller.register(self._dish, POLLOUT)
5966
self._executor.shutdown()
6067
self._dish.close()
61-
log.info('Receiver stopped')
68+
log.debug('Receiver stopped')
6269
except Exception as error:
6370
log.error('Failed to stop receiver', error=error)
6471
raise error
6572

66-
def register(self, callback: Callable[[Any], None]) -> None:
73+
def register(self, callback: OnMessage) -> None:
6774
self._callbacks.append(callback)
6875

69-
def deregister(self, callback: Callable[[Any], None]) -> None:
76+
def deregister(self, callback: OnMessage) -> None:
7077
self._callbacks.remove(callback)
7178

7279
def _handle_messages(self) -> None:
7380
while self._group:
74-
sockets = dict(self._poller.poll(timeout=100))
81+
sockets = dict(self._poller.poll(timeout=self._poll_timeout))
7582
if self._dish in sockets and sockets[self._dish] == POLLIN:
7683
try:
7784
data = self._dish.recv_json()
@@ -80,9 +87,9 @@ def _handle_messages(self) -> None:
8087
except Exception as error:
8188
log.error('Failed to receive message', group=self._group, error=error)
8289

83-
def _handle_message(self, data: dict[str, str]) -> None:
90+
def _handle_message(self, message: dict[str, Any]) -> None:
8491
for callback in self._callbacks:
8592
try:
86-
callback(data)
93+
callback(message)
8794
except Exception as error:
88-
log.warn('Error in callback execution', data=data, group=self._group, error=error)
95+
log.warn('Error in callback execution', data=message, group=self._group, error=error)

hello/sender.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
3535

3636
def start(self, target: GroupAccess) -> None:
3737
try:
38+
if self._group:
39+
raise RuntimeError('Sender already started')
3840
self._radio.connect(target.access_url)
3941
self._group = target.full_group
40-
log.info('Sender started', address=target.access_url, group=target.full_group)
42+
log.debug('Sender started', address=target.access_url, group=target.full_group)
4143
except Exception as error:
4244
log.error('Failed to start sender', address=target.access_url, group=target.full_group, error=error)
4345
raise error
@@ -46,7 +48,7 @@ def stop(self) -> None:
4648
try:
4749
self._group = None
4850
self._radio.close()
49-
log.info('Sender stopped')
51+
log.debug('Sender stopped')
5052
except Exception as error:
5153
log.error('Failed to stop sender', error=error)
5254
raise error
@@ -57,6 +59,8 @@ def send(self, data: Any) -> None:
5759
self._send_json(data)
5860
else:
5961
log.warning('Unsupported message type', data=data, group=self._group)
62+
else:
63+
log.warning('Cannot send message, sender not started', data=data)
6064

6165
def _convert_to_dict(self, data: Any) -> dict[str, Any] | None:
6266
if isinstance(data, dict):

0 commit comments

Comments
 (0)