Skip to content

Commit 8077171

Browse files
Initial commit WIP
1 parent e7dbc2c commit 8077171

15 files changed

Lines changed: 1013 additions & 108 deletions

hello/advertizer.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@ def stop(self) -> None:
117117
def advertise(self, info: ServiceInfo | None = None) -> None:
118118
self._advertizer.advertise(info)
119119

120-
def schedule(self, info: ServiceInfo | None = None, interval: float = 10, one_shot: bool = False) -> None:
120+
def schedule(self, info: ServiceInfo | None = None, interval: float = 60, one_shot: bool = False) -> None:
121121
if one_shot:
122122
self._timer.start(interval, self.advertise, [info])
123123
log.info('One-shot service advertisement scheduled', service=info, interval=interval)
124124
else:
125-
def periodic_advertise() -> None:
126-
self.advertise(info)
127-
self._timer.restart()
128-
129-
self._timer.start(interval, periodic_advertise)
125+
self._timer.start(interval, self._advertise_and_restart, [info])
130126
log.info('Periodic service advertisement scheduled', service=info, interval=interval)
127+
128+
def _advertise_and_restart(self, info: ServiceInfo | None = None) -> None:
129+
self.advertise(info)
130+
self._timer.restart()

hello/discoverer.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,13 @@ def discover(self, query: ServiceQuery | None = None) -> None:
3838
def get_services(self) -> dict[str, ServiceInfo]:
3939
raise NotImplementedError()
4040

41-
def register(self, callback: OnDiscoveryEvent) -> None:
41+
def register(self, handler: OnDiscoveryEvent) -> None:
4242
raise NotImplementedError()
4343

44-
def deregister(self, callback: OnDiscoveryEvent) -> None:
44+
def deregister(self, handler: OnDiscoveryEvent) -> None:
45+
raise NotImplementedError()
46+
47+
def get_handlers(self) -> list[OnDiscoveryEvent]:
4548
raise NotImplementedError()
4649

4750

@@ -52,8 +55,8 @@ def __init__(self, sender: Sender, receiver: Receiver) -> None:
5255
self._receiver = receiver
5356
self._group: Group | None = None
5457
self._matcher: ServiceMatcher | None = None
55-
self._services: dict[str, ServiceInfo] = {}
56-
self._callbacks: list[OnDiscoveryEvent] = []
58+
self._cache: dict[str, ServiceInfo] = {}
59+
self._handlers: list[OnDiscoveryEvent] = []
5760

5861
def __enter__(self) -> Discoverer:
5962
return self
@@ -85,28 +88,27 @@ def discover(self, query: ServiceQuery | None = None) -> None:
8588
log.warning('Cannot discover services, discoverer not started', query=query)
8689

8790
def get_services(self) -> dict[str, ServiceInfo]:
88-
return self._services.copy()
91+
return self._cache.copy()
8992

90-
def register(self, callback: OnDiscoveryEvent) -> None:
91-
self._callbacks.append(callback)
93+
def register(self, handler: OnDiscoveryEvent) -> None:
94+
self._handlers.append(handler)
9295

93-
def deregister(self, callback: OnDiscoveryEvent) -> None:
94-
self._callbacks.remove(callback)
96+
def deregister(self, handler: OnDiscoveryEvent) -> None:
97+
self._handlers.remove(handler)
9598

96-
def _handle_message(self, message: dict[str, Any]) -> None:
97-
service: ServiceInfo | None = None
99+
def get_handlers(self) -> list[OnDiscoveryEvent]:
100+
return self._handlers.copy()
98101

102+
def _handle_message(self, message: dict[str, Any]) -> None:
99103
try:
100104
service = ServiceInfo(**message)
105+
self._handle_service(service)
101106
except Exception as error:
102107
log.warn('Failed to handle received message', data=message, error=error)
103108

104-
if service:
105-
self._handle_service(service)
106-
107109
def _handle_service(self, service: ServiceInfo) -> None:
108110
if self._matcher and self._matcher.matches(service):
109-
cached = self._services.get(service.name)
111+
cached = self._cache.get(service.name)
110112

111113
if event := self._create_event(cached, service):
112114
self._handle_event(event)
@@ -116,17 +118,17 @@ def _create_event(self, cached: ServiceInfo | None, service: ServiceInfo) -> Dis
116118
if cached != service:
117119
log.info('Service updated', old_service=cached, new_service=service)
118120
return DiscoveryEvent(service, DiscoveryEventType.UPDATED)
121+
else:
122+
return None
119123
else:
120124
log.info('Service discovered', service=service)
121125
return DiscoveryEvent(service, DiscoveryEventType.DISCOVERED)
122126

123-
return None
124-
125127
def _handle_event(self, event: DiscoveryEvent) -> None:
126128
service = event.service
127-
self._services[service.name] = service
128-
for callback in self._callbacks:
129+
self._cache[service.name] = service
130+
for callback in self._handlers:
129131
try:
130132
callback(event)
131133
except Exception as error:
132-
log.warn('Error in callback execution', service=service, error=error)
134+
log.warn('Error in event handler execution', event=event, error=error)

hello/receiver.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Any, Protocol
33

44
from context_logger import get_logger
5-
from zmq import DISH, Poller, POLLIN, POLLOUT, Context
5+
from zmq import DISH, Poller, POLLIN, Context
66

77
from hello import GroupAccess
88

@@ -21,10 +21,13 @@ def start(self, source: GroupAccess) -> None:
2121
def stop(self) -> None:
2222
raise NotImplementedError()
2323

24-
def register(self, callback: OnMessage) -> None:
24+
def register(self, handler: OnMessage) -> None:
2525
raise NotImplementedError()
2626

27-
def deregister(self, callback: OnMessage) -> None:
27+
def deregister(self, handler: OnMessage) -> None:
28+
raise NotImplementedError()
29+
30+
def get_handlers(self) -> list[OnMessage]:
2831
raise NotImplementedError()
2932

3033

@@ -37,7 +40,7 @@ def __init__(self, context: Context[Any], max_workers: int = 1, poll_timeout: fl
3740
self._executor = ThreadPoolExecutor(max_workers=max_workers)
3841
self._poll_timeout = int(poll_timeout * 1000)
3942
self._group: str | None = None
40-
self._callbacks: list[OnMessage] = []
43+
self._handlers: list[OnMessage] = []
4144

4245
def __enter__(self) -> Receiver:
4346
return self
@@ -53,7 +56,7 @@ def start(self, source: GroupAccess) -> None:
5356
self._dish.bind(source.access_url)
5457
self._dish.join(source.full_group)
5558
self._group = source.full_group
56-
self._executor.submit(self._handle_messages)
59+
self._executor.submit(self._receive_loop)
5760
log.debug('Receiver started', address=source.access_url, group=source.full_group)
5861
except Exception as error:
5962
log.error('Failed to start receiver', address=source.access_url, group=source.full_group, error=error)
@@ -62,21 +65,23 @@ def start(self, source: GroupAccess) -> None:
6265
def stop(self) -> None:
6366
try:
6467
self._group = None
65-
self._poller.register(self._dish, POLLOUT)
6668
self._executor.shutdown()
6769
self._dish.close()
6870
log.debug('Receiver stopped')
6971
except Exception as error:
7072
log.error('Failed to stop receiver', error=error)
7173
raise error
7274

73-
def register(self, callback: OnMessage) -> None:
74-
self._callbacks.append(callback)
75+
def register(self, handler: OnMessage) -> None:
76+
self._handlers.append(handler)
77+
78+
def deregister(self, handler: OnMessage) -> None:
79+
self._handlers.remove(handler)
7580

76-
def deregister(self, callback: OnMessage) -> None:
77-
self._callbacks.remove(callback)
81+
def get_handlers(self) -> list[OnMessage]:
82+
return self._handlers.copy()
7883

79-
def _handle_messages(self) -> None:
84+
def _receive_loop(self) -> None:
8085
while self._group:
8186
sockets = dict(self._poller.poll(timeout=self._poll_timeout))
8287
if self._dish in sockets and sockets[self._dish] == POLLIN:
@@ -88,8 +93,8 @@ def _handle_messages(self) -> None:
8893
log.error('Failed to receive message', group=self._group, error=error)
8994

9095
def _handle_message(self, message: dict[str, Any]) -> None:
91-
for callback in self._callbacks:
96+
for handler in self._handlers:
9297
try:
93-
callback(message)
98+
handler(message)
9499
except Exception as error:
95-
log.warn('Error in callback execution', data=message, group=self._group, error=error)
100+
log.warn('Error in message handler execution', data=message, group=self._group, error=error)

hello/sender.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def stop(self) -> None:
5555

5656
def send(self, data: Any) -> None:
5757
if self._group:
58-
if data := self._convert_to_dict(data):
59-
self._send_json(data)
58+
if message := self._convert_to_dict(data):
59+
self._send_json(message)
6060
else:
6161
log.warning('Unsupported message type', data=data, group=self._group)
6262
else:

tests/advertizerIntegrationTest.py

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
GROUP = Group(GROUP_NAME)
1515

1616

17-
class AdvertizerTest(TestCase):
17+
class AdvertizerIntegrationTest(TestCase):
1818
SERVICE_INFO = ServiceInfo('test-service', 'test-role', 'http://localhost:8080')
1919

2020
@classmethod
@@ -44,32 +44,6 @@ def test_sends_hello_when_advertises_service(self):
4444
# Then
4545
self.assertEqual([self.SERVICE_INFO.__dict__], messages)
4646

47-
def test_sends_hello_when_advertises_service_and_info_changed(self):
48-
# Given
49-
context = Context()
50-
sender = RadioSender(context)
51-
messages = []
52-
53-
with DefaultAdvertizer(sender) as advertizer, DishReceiver(context) as test_receiver:
54-
test_receiver.start(GroupAccess(ACCESS_URL, GROUP.hello()))
55-
test_receiver.register(lambda message: messages.append(message))
56-
advertizer.start(ACCESS_URL, GROUP)
57-
58-
advertizer.advertise(self.SERVICE_INFO)
59-
60-
self.SERVICE_INFO.url = 'http://localhost:9090'
61-
62-
# When
63-
advertizer.advertise(self.SERVICE_INFO)
64-
65-
wait_for_assertion(0.1, lambda: self.assertEqual(2, len(messages)))
66-
67-
# Then
68-
self.assertEqual([
69-
{'name': 'test-service', 'role': 'test-role', 'url': 'http://localhost:8080'},
70-
{'name': 'test-service', 'role': 'test-role', 'url': 'http://localhost:9090'}
71-
], messages)
72-
7347
def test_sends_hello_when_query_received(self):
7448
# Given
7549
context = Context()

tests/apiIntegrationTest.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
SERVICE_QUERY = ServiceQuery('test-service', 'test-role')
1515

1616

17-
class ApiTest(TestCase):
17+
class ApiIntegrationTest(TestCase):
1818

1919
@classmethod
2020
def setUpClass(cls):
@@ -29,7 +29,7 @@ def test_discoverer_caches_advertised_service(self):
2929
context = Context()
3030
hello = DefaultHello(context)
3131

32-
with hello.default_advertizer() as advertizer, hello.discoverer() as discoverer:
32+
with hello.default_advertizer(respond=False) as advertizer, hello.discoverer() as discoverer:
3333
advertizer.start(ACCESS_URL, GROUP, SERVICE_INFO)
3434
discoverer.start(ACCESS_URL, GROUP, SERVICE_QUERY)
3535

@@ -46,7 +46,7 @@ def test_discoverer_caches_advertised_service_when_scheduled_once(self):
4646
context = Context()
4747
hello = DefaultHello(context)
4848

49-
with hello.scheduled_advertizer() as advertizer, hello.discoverer() as discoverer:
49+
with hello.scheduled_advertizer(respond=False) as advertizer, hello.discoverer() as discoverer:
5050
advertizer.start(ACCESS_URL, GROUP, SERVICE_INFO)
5151
discoverer.start(ACCESS_URL, GROUP, SERVICE_QUERY)
5252

@@ -87,7 +87,7 @@ def test_discoverer_caches_discovery_response_service(self):
8787
# When
8888
discoverer.discover()
8989

90-
wait_for_assertion(0.1, lambda: self.assertEqual(1, len(discoverer.get_services())))
90+
wait_for_assertion(0.2, lambda: self.assertEqual(1, len(discoverer.get_services())))
9191

9292
# Then
9393
self.assertEqual({self.SERVICE_INFO.name: self.SERVICE_INFO}, discoverer.get_services())

0 commit comments

Comments
 (0)