|
10 | 10 | import sys |
11 | 11 | import threading |
12 | 12 | import traceback |
13 | | -from asyncio.locks import Event |
14 | 13 | from time import monotonic |
15 | 14 | from typing import ( |
16 | 15 | Any, |
|
26 | 25 |
|
27 | 26 | from .services import Service |
28 | 27 | from .utils.futures import maybe_async, maybe_set_exception, maybe_set_result, notify |
| 28 | +from .utils.locks import Event |
29 | 29 |
|
30 | 30 | __all__ = [ |
31 | 31 | "QueuedMethod", |
@@ -104,16 +104,18 @@ def __init__( |
104 | 104 | **kwargs: Any, |
105 | 105 | ) -> None: |
106 | 106 | # cannot share loop between threads, so create a new one |
| 107 | + assert asyncio.get_event_loop_policy().get_event_loop() |
107 | 108 | if executor is not None: |
108 | 109 | raise NotImplementedError("executor argument no longer supported") |
109 | 110 | self.parent_loop = loop or asyncio.get_event_loop_policy().get_event_loop() |
110 | 111 | self.thread_loop = ( |
111 | 112 | thread_loop or asyncio.get_event_loop_policy().new_event_loop() |
112 | 113 | ) |
113 | | - self._thread_started = Event() |
| 114 | + self._thread_started = Event(loop=self.parent_loop) |
114 | 115 | if Worker is not None: |
115 | 116 | self.Worker = Worker |
116 | 117 | super().__init__(loop=self.thread_loop, **kwargs) |
| 118 | + assert self._shutdown.loop is self.parent_loop |
117 | 119 |
|
118 | 120 | async def on_thread_started(self) -> None: |
119 | 121 | ... |
@@ -148,7 +150,7 @@ async def on_thread_stop(self) -> None: |
148 | 150 | # thread calls _shutdown.set(), parent calls _shutdown.wait() |
149 | 151 |
|
150 | 152 | def _new_shutdown_event(self) -> Event: |
151 | | - return Event() |
| 153 | + return Event(loop=self.parent_loop) |
152 | 154 |
|
153 | 155 | async def maybe_start(self) -> bool: |
154 | 156 | if not self._thread_started.is_set(): |
@@ -179,13 +181,12 @@ async def start(self) -> None: |
179 | 181 |
|
180 | 182 | async def _keepalive2(self) -> None: |
181 | 183 | while not self.should_stop: |
182 | | - await self.sleep(1.1) |
| 184 | + await self.sleep(2.0) |
183 | 185 | if self.last_wakeup_at: |
184 | 186 | if monotonic() - self.last_wakeup_at > 3.0: |
185 | 187 | self.log.error("Thread keepalive is not responding...") |
186 | | - asyncio.run_coroutine_threadsafe( |
187 | | - self._wakeup_timer_in_thread(), self.thread_loop |
188 | | - ) |
| 188 | + await asyncio.sleep(0.0) # for unittest to invoke `call_soon` |
| 189 | + await self._wakeup_timer_in_thread() |
189 | 190 |
|
190 | 191 | async def _wakeup_timer_in_thread(self) -> None: |
191 | 192 | self.last_wakeup_at = monotonic() |
@@ -323,7 +324,7 @@ class MethodQueue(Service): |
323 | 324 | def __init__(self, num_workers: int = 2, **kwargs: Any) -> None: |
324 | 325 | super().__init__(**kwargs) |
325 | 326 | self._queue = asyncio.Queue() |
326 | | - self._queue_ready = Event() |
| 327 | + self._queue_ready = Event(loop=self.loop) |
327 | 328 | self.num_workers = num_workers |
328 | 329 | self._workers = [] |
329 | 330 |
|
|
0 commit comments