Skip to content

Commit a4b5384

Browse files
Change service query, update receiver message handling (#5)
1 parent 25687ad commit a4b5384

8 files changed

Lines changed: 85 additions & 29 deletions

File tree

examples/cameraDiscoveryExample.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
def main() -> None:
1212
shutdown_event = setup_shutdown()
1313

14-
# Define the group to discover camera services
14+
# Define the group to discover camera services on
1515
group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555')
1616

17-
# Define the query to discover camera services
18-
query = ServiceQuery(name='.+', role='camera')
17+
# Define the query to discover matching camera services
18+
query = ServiceQuery(name_filter='.+', role_filter='camera')
1919

2020
# Use a discoverer to find camera services
2121
with Hello.builder().discoverer().default() as discoverer:

examples/cameraServiceExample.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
def main() -> None:
1212
shutdown_event = setup_shutdown()
1313

14-
# Define the group to advertise the camera service
14+
# Define the group to advertise the camera service on
1515
group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555')
1616

1717
# Define the service information for the camera

hello/discoverer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ def _create_event(self, cached: ServiceInfo | None, service: ServiceInfo) -> Dis
120120
log.info('Service updated', old_service=cached, new_service=service)
121121
return DiscoveryEvent(service, DiscoveryEventType.UPDATED)
122122
else:
123+
log.debug('Service unchanged', service=service)
123124
return None
124125
else:
125126
log.info('Service discovered', service=service)

hello/group.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ class Group:
1313
url: str
1414

1515
def hello(self) -> 'PrefixedGroup':
16-
return PrefixedGroup(self, GroupPrefix.HELLO)
16+
return PrefixedGroup(GroupPrefix.HELLO, self)
1717

1818
def query(self) -> 'PrefixedGroup':
19-
return PrefixedGroup(self, GroupPrefix.QUERY)
19+
return PrefixedGroup(GroupPrefix.QUERY, self)
2020

2121

2222
@dataclass
2323
class PrefixedGroup:
24-
group: Group
2524
prefix: GroupPrefix
25+
group: Group
2626

2727
@property
2828
def name(self) -> str:

hello/receiver.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ def get_handlers(self) -> list[OnMessage]:
3333

3434
class DishReceiver(Receiver):
3535

36-
def __init__(self, context: Context[Any], max_workers: int = 1, poll_timeout: float = 0.1) -> None:
36+
def __init__(self, context: Context[Any], max_workers: int = 8, poll_timeout: float = 0.1) -> None:
3737
self._context = context
3838
self._dish = self._context.socket(DISH)
3939
self._poller = Poller()
40-
self._executor = ThreadPoolExecutor(max_workers=max_workers)
40+
self._loop_executor = ThreadPoolExecutor(max_workers=1)
41+
self._handler_executor = ThreadPoolExecutor(max_workers=max_workers)
4142
self._poll_timeout = int(poll_timeout * 1000)
4243
self._group: str | None = None
4344
self._handlers: list[OnMessage] = []
@@ -56,7 +57,7 @@ def start(self, group: PrefixedGroup) -> None:
5657
self._dish.bind(group.url)
5758
self._dish.join(group.name)
5859
self._group = group.name
59-
self._executor.submit(self._receive_loop)
60+
self._loop_executor.submit(self._receive_loop)
6061
log.debug('Receiver started', url=group.url, group=group.name)
6162
except Exception as error:
6263
log.error('Failed to start receiver', url=group.url, group=group.name, error=error)
@@ -65,7 +66,7 @@ def start(self, group: PrefixedGroup) -> None:
6566
def stop(self) -> None:
6667
try:
6768
self._group = None
68-
self._executor.shutdown()
69+
self._loop_executor.shutdown()
6970
self._dish.close()
7071
log.debug('Receiver stopped')
7172
except Exception as error:
@@ -83,18 +84,21 @@ def get_handlers(self) -> list[OnMessage]:
8384

8485
def _receive_loop(self) -> None:
8586
while self._group:
86-
sockets = dict(self._poller.poll(timeout=self._poll_timeout))
87-
if self._dish in sockets and sockets[self._dish] == POLLIN:
88-
try:
89-
data = self._dish.recv_json()
90-
log.debug('Message received', data=data, group=self._group)
91-
self._handle_message(data)
92-
except Exception as error:
93-
log.error('Failed to receive message', group=self._group, error=error)
87+
try:
88+
sockets = dict(self._poller.poll(timeout=self._poll_timeout))
89+
if self._dish in sockets and sockets[self._dish] == POLLIN:
90+
message = self._dish.recv_json()
91+
self._handle_message(message)
92+
except Exception as error:
93+
log.error('Failed to receive message', group=self._group, error=error)
9494

9595
def _handle_message(self, message: dict[str, Any]) -> None:
96+
log.debug('Message received', data=message, group=self._group)
9697
for handler in self._handlers:
97-
try:
98-
handler(message)
99-
except Exception as error:
100-
log.warn('Error in message handler execution', data=message, group=self._group, error=error)
98+
self._handler_executor.submit(self._execute_handler, handler, message)
99+
100+
def _execute_handler(self, handler: OnMessage, message: dict[str, Any]) -> None:
101+
try:
102+
handler(message)
103+
except Exception as error:
104+
log.warn('Error in message handler execution', data=message, group=self._group, error=error)

hello/sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ def start(self, group: PrefixedGroup) -> None:
4646

4747
def stop(self) -> None:
4848
try:
49-
self._group = None
5049
self._radio.close()
50+
self._group = None
5151
log.debug('Sender stopped')
5252
except Exception as error:
5353
log.error('Failed to stop sender', error=error)

hello/service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ class ServiceInfo:
1111

1212
@dataclass
1313
class ServiceQuery(object):
14-
name: str
15-
role: str
14+
name_filter: str
15+
role_filter: str
1616

1717

1818
class ServiceMatcher(object):
1919

2020
def __init__(self, query: ServiceQuery) -> None:
2121
self.query = query
22-
self._name_matcher = re.compile(self.query.name)
23-
self._role_matcher = re.compile(self.query.role)
22+
self._name_matcher = re.compile(self.query.name_filter)
23+
self._role_matcher = re.compile(self.query.role_filter)
2424

2525
def matches(self, info: ServiceInfo) -> bool:
2626
name_match = self._name_matcher.match(info.name)

tests/apiIntegrationTest.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
from threading import Thread
23
from unittest import TestCase
34

45
from context_logger import setup_logging
@@ -37,6 +38,32 @@ def test_discoverer_caches_advertised_service(self):
3738
# Then
3839
self.assertEqual({SERVICE_INFO.name: SERVICE_INFO}, discoverer.get_services())
3940

41+
def test_discoverer_caches_advertised_services(self):
42+
# Given
43+
config = HelloConfig(advertizer_responder=False)
44+
45+
with (Hello.builder(config).advertizer().default() as advertizer1,
46+
Hello.builder(config).advertizer().default() as advertizer2,
47+
Hello.builder(config).discoverer().default() as discoverer):
48+
service_info1 = ServiceInfo('test-service1', 'test-role', {'test': 'http://localhost:8080'})
49+
service_info2 = ServiceInfo('test-service2', 'test-role', {'test': 'http://localhost:8080'})
50+
advertizer1.start(GROUP, service_info1)
51+
advertizer2.start(GROUP, service_info2)
52+
discoverer.start(GROUP, ServiceQuery('test-service.+', 'test-role'))
53+
54+
# When
55+
for _ in range(5):
56+
Thread(target=advertizer1.advertise).start()
57+
Thread(target=advertizer2.advertise).start()
58+
59+
wait_for_assertion(0.2, lambda: self.assertEqual(2, len(discoverer.get_services())))
60+
61+
# Then
62+
self.assertEqual({
63+
service_info1.name: service_info1,
64+
service_info2.name: service_info2
65+
}, discoverer.get_services())
66+
4067
def test_discoverer_caches_advertised_service_when_advertisement_scheduled_once(self):
4168
# Given
4269
config = HelloConfig(advertizer_responder=False)
@@ -83,11 +110,35 @@ def test_discoverer_caches_discovery_response_service(self):
83110
# When
84111
discoverer.discover()
85112

86-
wait_for_assertion(0.1, lambda: self.assertEqual(1, len(discoverer.get_services())))
113+
wait_for_assertion(0.2, lambda: self.assertEqual(1, len(discoverer.get_services())))
87114

88115
# Then
89116
self.assertEqual({SERVICE_INFO.name: SERVICE_INFO}, discoverer.get_services())
90117

118+
def test_discoverer_caches_discovery_response_services(self):
119+
# Given
120+
config = HelloConfig()
121+
122+
with (Hello.builder(config).advertizer().default() as advertizer1,
123+
Hello.builder(config).advertizer().default() as advertizer2,
124+
Hello.builder(config).discoverer().default() as discoverer):
125+
service_info1 = ServiceInfo('test-service1', 'test-role', {'test': 'http://localhost:8080'})
126+
service_info2 = ServiceInfo('test-service2', 'test-role', {'test': 'http://localhost:8080'})
127+
advertizer1.start(GROUP, service_info1)
128+
advertizer2.start(GROUP, service_info2)
129+
discoverer.start(GROUP, ServiceQuery('test-service.+', 'test-role'))
130+
131+
# When
132+
discoverer.discover()
133+
134+
wait_for_assertion(0.2, lambda: self.assertEqual(2, len(discoverer.get_services())))
135+
136+
# Then
137+
self.assertEqual({
138+
service_info1.name: service_info1,
139+
service_info2.name: service_info2
140+
}, discoverer.get_services())
141+
91142
def test_discoverer_caches_discovery_response_service_when_discovery_scheduled_once(self):
92143
# Given
93144
config = HelloConfig()

0 commit comments

Comments
 (0)