Skip to content

Commit dac6c95

Browse files
Extend discovery event, parallelize discovery event handler execution (#9)
1 parent 9fd9a3f commit dac6c95

8 files changed

Lines changed: 90 additions & 91 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ name: CI
22

33
on:
44
push:
5-
branches: [main]
6-
tags: [v*.*.*]
5+
branches: [ main ]
6+
tags: [ v*.*.* ]
77

88
pull_request:
9-
branches: [ "main" ]
9+
branches: [ main ]
1010
types:
1111
- synchronize
1212
- opened

hello/advertizer.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,15 @@ def stop(self) -> None:
4949
self._sender.stop()
5050

5151
def advertise(self, info: ServiceInfo | None = None) -> None:
52+
if info:
53+
self._info = info
54+
5255
if self._group:
53-
if info:
54-
self._info = info
5556
if self._info:
5657
self._sender.send(self._info)
5758
log.info('Service advertised', service=self._info, group=self._group)
59+
else:
60+
log.warning('Cannot advertise service, no service info provided', group=self._group)
5861
else:
5962
log.warning('Cannot advertise service, advertizer not started', service=info)
6063

@@ -72,21 +75,22 @@ def start(self, group: Group, info: ServiceInfo | None = None) -> None:
7275
self._receiver.register(self._handle_message)
7376

7477
def stop(self) -> None:
75-
super().stop()
78+
self._receiver.deregister(self._handle_message)
7679
self._receiver.stop()
80+
super().stop()
7781

7882
def _handle_message(self, message: dict[str, Any]) -> None:
7983
if self._info:
8084
try:
8185
query = ServiceQuery(**message)
82-
log.debug('Query received', group=self._group, query=query)
83-
self._handle_query(query, self._info)
86+
matcher = ServiceMatcher(query)
87+
log.debug('Service query received', group=self._group, query=query)
88+
self._handle_query(matcher, self._info)
8489
except Exception as error:
85-
log.warning('Invalid query message received', group=self._group, received=message, error=error)
90+
log.warning('Invalid service query received', group=self._group, received=message, error=error)
8691

87-
def _handle_query(self, query: ServiceQuery, info: ServiceInfo) -> None:
88-
matcher = ServiceMatcher(query)
89-
if matcher and matcher.matches(info):
92+
def _handle_query(self, matcher: ServiceMatcher, info: ServiceInfo) -> None:
93+
if matcher.matches(info):
9094
delay = round(self._max_delay * random.random(), 3)
9195
log.info('Responding to query', group=self._group, query=matcher.query, service=info, delay=delay)
9296
time.sleep(delay)

hello/discoverer.py

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# SPDX-FileCopyrightText: 2024 Attila Gombos <attila.gombos@effective-range.com>
33
# SPDX-License-Identifier: MIT
44

5+
from concurrent.futures import ThreadPoolExecutor
56
from dataclasses import dataclass
67
from enum import Enum
78
from typing import Any, Protocol
@@ -22,6 +23,8 @@ class DiscoveryEventType(Enum):
2223

2324
@dataclass
2425
class DiscoveryEvent:
26+
group: Group
27+
query: ServiceQuery
2528
service: ServiceInfo
2629
type: DiscoveryEventType
2730

@@ -41,28 +44,26 @@ def stop(self) -> None:
4144
def discover(self, query: ServiceQuery | None = None) -> None:
4245
raise NotImplementedError()
4346

44-
def get_services(self) -> dict[UUID, ServiceInfo]:
45-
raise NotImplementedError()
46-
4747
def register(self, handler: OnDiscoveryEvent) -> None:
4848
raise NotImplementedError()
4949

5050
def deregister(self, handler: OnDiscoveryEvent) -> None:
5151
raise NotImplementedError()
5252

53-
def get_handlers(self) -> list[OnDiscoveryEvent]:
53+
def get_services(self) -> dict[UUID, ServiceInfo]:
5454
raise NotImplementedError()
5555

5656

5757
class DefaultDiscoverer(Discoverer):
5858

59-
def __init__(self, sender: Sender, receiver: Receiver) -> None:
59+
def __init__(self, sender: Sender, receiver: Receiver, max_workers: int = 8) -> None:
6060
self._sender = sender
6161
self._receiver = receiver
6262
self._group: Group | None = None
6363
self._matcher: ServiceMatcher | None = None
64-
self._cache: dict[UUID, ServiceInfo] = {}
64+
self._services: dict[UUID, ServiceInfo] = {}
6565
self._handlers: list[OnDiscoveryEvent] = []
66+
self._handler_executor = ThreadPoolExecutor(max_workers=max_workers)
6667

6768
def __enter__(self) -> Discoverer:
6869
return self
@@ -80,65 +81,73 @@ def start(self, group: Group, query: ServiceQuery | None = None) -> None:
8081

8182
def stop(self) -> None:
8283
self._group = None
84+
self._matcher = None
8385
self._sender.stop()
86+
self._receiver.deregister(self._handle_message)
8487
self._receiver.stop()
8588

8689
def discover(self, query: ServiceQuery | None = None) -> None:
90+
if query:
91+
self._matcher = ServiceMatcher(query)
92+
8793
if self._group:
88-
if query:
89-
self._matcher = ServiceMatcher(query)
9094
if self._matcher:
9195
self._sender.send(self._matcher.query)
92-
log.info('Service discovery initiated', query=self._matcher.query, group=self._group)
96+
log.info('Service discovery initiated', group=self._group, query=self._matcher.query)
97+
else:
98+
log.warning('Cannot discover services, no query provided', group=self._group)
9399
else:
94100
log.warning('Cannot discover services, discoverer not started', query=query)
95101

96-
def get_services(self) -> dict[UUID, ServiceInfo]:
97-
return self._cache.copy()
98-
99102
def register(self, handler: OnDiscoveryEvent) -> None:
100103
self._handlers.append(handler)
101104

102105
def deregister(self, handler: OnDiscoveryEvent) -> None:
103106
self._handlers.remove(handler)
104107

105-
def get_handlers(self) -> list[OnDiscoveryEvent]:
106-
return self._handlers.copy()
108+
def get_services(self) -> dict[UUID, ServiceInfo]:
109+
return self._services.copy()
107110

108111
def _handle_message(self, message: dict[str, Any]) -> None:
109-
try:
110-
service = ServiceInfo(UUID(message['uuid']), message['name'], message['role'], message.get('urls', {}))
111-
self._handle_service(service)
112-
except Exception as error:
113-
log.warn('Failed to handle received message', data=message, error=error)
112+
if self._group and self._matcher:
113+
try:
114+
service = ServiceInfo(UUID(message['uuid']), message['name'], message['role'], message.get('urls', {}))
115+
log.debug('Service info received', service=service, group=self._group)
116+
self._handle_service(service, self._group, self._matcher)
117+
except Exception as error:
118+
log.warn('Invalid service info received', group=self._group, data=message, error=error)
114119

115-
def _handle_service(self, service: ServiceInfo) -> None:
116-
if self._matcher and self._matcher.matches(service):
117-
cached = self._cache.get(service.uuid)
120+
def _handle_service(self, service: ServiceInfo, group: Group, matcher: ServiceMatcher) -> None:
121+
if matcher.matches(service):
122+
stored = self._services.get(service.uuid)
118123

119-
if event := self._create_event(cached, service):
124+
if event := self._create_event(group, matcher, stored, service):
120125
self._handle_event(event)
121126

122-
def _create_event(self, cached: ServiceInfo | None, service: ServiceInfo) -> DiscoveryEvent | None:
123-
if cached:
124-
if cached != service:
125-
log.info('Service updated', old_service=cached, new_service=service)
126-
return DiscoveryEvent(service, DiscoveryEventType.UPDATED)
127+
def _create_event(self, group: Group, matcher: ServiceMatcher,
128+
stored: ServiceInfo | None, service: ServiceInfo) -> DiscoveryEvent | None:
129+
if stored:
130+
if stored != service:
131+
log.info('Service updated', group=group, old_service=stored, new_service=service)
132+
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.UPDATED)
127133
else:
128-
log.debug('Service unchanged', service=service)
134+
log.debug('Service unchanged', group=group, service=service)
129135
return None
130136
else:
131-
log.info('Service discovered', service=service)
132-
return DiscoveryEvent(service, DiscoveryEventType.DISCOVERED)
137+
log.info('New service discovered', group=group, service=service)
138+
return DiscoveryEvent(group, matcher.query, service, DiscoveryEventType.DISCOVERED)
133139

134140
def _handle_event(self, event: DiscoveryEvent) -> None:
135-
service = event.service
136-
self._cache[service.uuid] = service
137-
for callback in self._handlers:
138-
try:
139-
callback(event)
140-
except Exception as error:
141-
log.warn('Error in event handler execution', event=event, error=error)
141+
self._services[event.service.uuid] = event.service
142+
143+
for handler in self._handlers:
144+
self._handler_executor.submit(self._execute_handler, handler, event)
145+
146+
def _execute_handler(self, handler: OnDiscoveryEvent, event: DiscoveryEvent) -> None:
147+
try:
148+
handler(event)
149+
except Exception as error:
150+
log.warn('Error in event handler execution', event=event, error=error)
142151

143152

144153
class ScheduledDiscoverer(DefaultScheduler[ServiceQuery], Discoverer):
@@ -172,8 +181,5 @@ def register(self, handler: OnDiscoveryEvent) -> None:
172181
def deregister(self, handler: OnDiscoveryEvent) -> None:
173182
self._discoverer.deregister(handler)
174183

175-
def get_handlers(self) -> list[OnDiscoveryEvent]:
176-
return self._discoverer.get_handlers()
177-
178184
def _execute(self, query: ServiceQuery | None = None) -> None:
179185
self.discover(query)

hello/receiver.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ def register(self, handler: OnMessage) -> None:
3131
def deregister(self, handler: OnMessage) -> None:
3232
raise NotImplementedError()
3333

34-
def get_handlers(self) -> list[OnMessage]:
35-
raise NotImplementedError()
36-
3734

3835
class DishReceiver(Receiver):
3936

@@ -83,9 +80,6 @@ def register(self, handler: OnMessage) -> None:
8380
def deregister(self, handler: OnMessage) -> None:
8481
self._handlers.remove(handler)
8582

86-
def get_handlers(self) -> list[OnMessage]:
87-
return self._handlers.copy()
88-
8983
def _receive_loop(self) -> None:
9084
while self._group:
9185
try:
@@ -105,4 +99,4 @@ def _execute_handler(self, handler: OnMessage, message: dict[str, Any]) -> None:
10599
try:
106100
handler(message)
107101
except Exception as error:
108-
log.warn('Error in message handler execution', data=message, group=self._group, error=error)
102+
log.warn('Handler failed to process message', data=message, group=self._group, error=error)

pyproject.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ description = "A service advertizer/discovery protocol library using ZeroMQ"
44
authors = [
55
{ name = "Ferenc Nandor Janky & Attila Gombos", email = "info@effective-range.com" }
66
]
7+
maintainers = [
8+
{ name = "Ferenc Nandor Janky & Attila Gombos", email = "info@effective-range.com" }
9+
]
710
dependencies = [
811
"pyzmq @ git+https://github.com/EffectiveRange/pyzmq.git@v27.1.1",
912
"python-context-logger @ git+https://github.com/EffectiveRange/python-context-logger.git@latest",
@@ -25,3 +28,8 @@ build-backend = "setuptools.build_meta"
2528
[tool.setuptools_scm]
2629
version_scheme = "guess-next-dev"
2730
local_scheme = "node-and-date"
31+
32+
[tool.pytest]
33+
addopts = ["--verbose", "--capture=no"]
34+
python_files = ["*Test.py"]
35+
python_classes = ["*Test"]

tests/defaultDiscovererTest.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from uuid import uuid4
55

66
from context_logger import setup_logging
7+
from test_utility import wait_for_assertion
78

89
from hello import ServiceInfo, Group, ServiceQuery, DefaultDiscoverer, Sender, Receiver, OnDiscoveryEvent, \
910
DiscoveryEventType, DiscoveryEvent
@@ -74,7 +75,7 @@ def test_registers_event_handler(self):
7475
discoverer.register(handler)
7576

7677
# Then
77-
self.assertIn(handler, discoverer.get_handlers())
78+
self.assertIn(handler, discoverer._handlers)
7879

7980
def test_deregisters_event_handler(self):
8081
# Given
@@ -88,7 +89,7 @@ def test_deregisters_event_handler(self):
8889
discoverer.deregister(handler)
8990

9091
# Then
91-
self.assertNotIn(handler, discoverer.get_handlers())
92+
self.assertNotIn(handler, discoverer._handlers)
9293

9394
def test_caches_service_and_calls_handler_when_receives_matching_info(self):
9495
# Given
@@ -104,7 +105,9 @@ def test_caches_service_and_calls_handler_when_receives_matching_info(self):
104105

105106
# Then
106107
self.assertEqual({SERVICE_INFO.uuid: SERVICE_INFO}, discoverer.get_services())
107-
handler.assert_called_once_with(DiscoveryEvent(SERVICE_INFO, DiscoveryEventType.DISCOVERED))
108+
wait_for_assertion(1, lambda: handler.assert_called_once_with(
109+
DiscoveryEvent(GROUP, SERVICE_QUERY, SERVICE_INFO, DiscoveryEventType.DISCOVERED)
110+
))
108111

109112
def test_updates_service_and_calls_handler_when_receives_matching_info(self):
110113
# Given
@@ -125,7 +128,9 @@ def test_updates_service_and_calls_handler_when_receives_matching_info(self):
125128

126129
# Then
127130
self.assertEqual({SERVICE_INFO.uuid: new_service_info}, discoverer.get_services())
128-
handler.assert_called_once_with(DiscoveryEvent(new_service_info, DiscoveryEventType.UPDATED))
131+
wait_for_assertion(1, lambda: handler.assert_called_once_with(
132+
DiscoveryEvent(GROUP, SERVICE_QUERY, new_service_info, DiscoveryEventType.UPDATED)
133+
))
129134

130135
def test_does_not_call_handler_when_service_info_not_changed(self):
131136
# Given
@@ -159,7 +164,9 @@ def test_handles_handler_error_gracefully(self):
159164

160165
# Then
161166
self.assertEqual({SERVICE_INFO.uuid: SERVICE_INFO}, discoverer.get_services())
162-
handler.assert_called_once_with(DiscoveryEvent(SERVICE_INFO, DiscoveryEventType.DISCOVERED))
167+
wait_for_assertion(1, lambda: handler.assert_called_once_with(
168+
DiscoveryEvent(GROUP, SERVICE_QUERY, SERVICE_INFO, DiscoveryEventType.DISCOVERED)
169+
))
163170

164171
def test_handles_invalid_message_gracefully(self):
165172
# Given

tests/dishReceiverTest.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
from itertools import chain, repeat
23
from unittest import TestCase
34
from unittest.mock import MagicMock
45
from uuid import uuid4
@@ -94,7 +95,7 @@ def test_registers_handler(self):
9495
receiver.register(handler)
9596

9697
# Then
97-
self.assertIn(handler, receiver.get_handlers())
98+
self.assertIn(handler, receiver._handlers)
9899

99100
def test_deregisters_handler(self):
100101
# Given
@@ -107,7 +108,7 @@ def test_deregisters_handler(self):
107108
receiver.deregister(handler)
108109

109110
# Then
110-
self.assertNotIn(handler, receiver.get_handlers())
111+
self.assertNotIn(handler, receiver._handlers)
111112

112113
def test_calls_registered_handler_on_message(self):
113114
# Given
@@ -118,9 +119,7 @@ def test_calls_registered_handler_on_message(self):
118119

119120
with DishReceiver(context) as receiver:
120121
receiver._poller = MagicMock(spec=Poller)
121-
receiver._poller.poll.side_effect = [
122-
{context.socket.return_value: POLLIN},
123-
]
122+
receiver._poller.poll.side_effect = chain([{context.socket.return_value: POLLIN}], repeat({}))
124123
receiver.register(handler)
125124

126125
# When
@@ -138,9 +137,7 @@ def test_handles_message_receive_error_gracefully(self):
138137

139138
with DishReceiver(context) as receiver:
140139
receiver._poller = MagicMock(spec=Poller)
141-
receiver._poller.poll.side_effect = [
142-
{context.socket.return_value: POLLIN},
143-
]
140+
receiver._poller.poll.side_effect = chain([{context.socket.return_value: POLLIN}], repeat({}))
144141
receiver.register(handler)
145142

146143
# When
@@ -159,9 +156,7 @@ def test_handles_handler_execution_error_gracefully(self):
159156

160157
with DishReceiver(context) as receiver:
161158
receiver._poller = MagicMock(spec=Poller)
162-
receiver._poller.poll.side_effect = [
163-
{context.socket.return_value: POLLIN},
164-
]
159+
receiver._poller.poll.side_effect = chain([{context.socket.return_value: POLLIN}], repeat({}))
165160
receiver.register(handler)
166161

167162
# When

0 commit comments

Comments
 (0)