diff --git a/dimos/core/coordination/_test_module.py b/dimos/core/coordination/_test_module.py new file mode 100644 index 0000000000..a280da84c3 --- /dev/null +++ b/dimos/core/coordination/_test_module.py @@ -0,0 +1,36 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from reactivex.disposable import Disposable + +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.core.stream import In, Out + + +class AliceModule(Module): + greetings: In[str] + response: Out[str] + + @rpc + def start(self) -> None: + super().start() + self._disposables.add(Disposable(self.greetings.subscribe(self._on_greetings))) + + @rpc + def stop(self) -> None: + super().stop() + + def _on_greetings(self, greeting: str) -> None: + self.response.publish(f"Hello {greeting} from Alice") diff --git a/dimos/core/coordination/module_coordinator.py b/dimos/core/coordination/module_coordinator.py index f6600a95b6..32f34c57b1 100644 --- a/dimos/core/coordination/module_coordinator.py +++ b/dimos/core/coordination/module_coordinator.py @@ -16,6 +16,7 @@ from collections import defaultdict from collections.abc import Mapping +import importlib import shutil import sys import threading @@ -34,7 +35,7 @@ from dimos.utils.safe_thread_map import safe_thread_map if TYPE_CHECKING: - from dimos.core.coordination.blueprints import Blueprint + from dimos.core.coordination.blueprints import Blueprint, _BlueprintAtom from dimos.core.rpc_client import ModuleProxy, ModuleProxyProtocol logger = setup_logger() @@ -55,7 +56,11 @@ def __init__( cls.deployment_identifier: cls(g=g) for cls in manager_types } self._deployed_modules = {} + self._deployed_atoms: dict[type[ModuleBase], _BlueprintAtom] = {} + self._resolved_module_refs: dict[tuple[type[ModuleBase], str], type[ModuleBase]] = {} self._transport_registry: dict[tuple[str, type], PubSubTransport[Any]] = {} + self._class_aliases: dict[type[ModuleBase], type[ModuleBase]] = {} + self._module_transports: dict[type[ModuleBase], dict[str, PubSubTransport[Any]]] = {} self._started = False def start(self) -> None: @@ -168,13 +173,20 @@ def start_all_modules(self) -> None: safe_thread_map(modules, lambda m: m.start()) + self._send_on_system_modules() + + def _resolve_class(self, cls: type[ModuleBase]) -> type[ModuleBase]: + return self._class_aliases.get(cls, cls) + + def get_instance(self, module: type[ModuleBase]) -> ModuleProxy: + return self._deployed_modules.get(self._resolve_class(module)) # type: ignore[return-value, no-any-return] + + def _send_on_system_modules(self) -> None: + modules = list(self._deployed_modules.values()) for module in modules: if hasattr(module, "on_system_modules"): module.on_system_modules(modules) - def get_instance(self, module: type[ModuleBase]) -> ModuleProxy: - return self._deployed_modules.get(module) # type: ignore[return-value, no-any-return] - def _connect_streams(self, blueprint: Blueprint) -> None: streams: dict[tuple[str, type], list[tuple[type, str]]] = defaultdict(list) @@ -194,6 +206,7 @@ def _connect_streams(self, blueprint: Blueprint) -> None: for module, original_name in streams[key]: instance = self.get_instance(module) # type: ignore[assignment] instance.set_transport(original_name, transport) # type: ignore[union-attr] + self._module_transports.setdefault(module, {})[original_name] = transport logger.info( "Transport", name=remapped_name, @@ -285,15 +298,153 @@ def load_blueprint( safe_thread_map(new_modules, lambda m: m.build()) safe_thread_map(new_modules, lambda m: m.start()) - # Re-notify all modules about the updated module list. - all_modules = list(self._deployed_modules.values()) - for module in all_modules: - if hasattr(module, "on_system_modules"): - module.on_system_modules(all_modules) + self._send_on_system_modules() def load_module(self, module_class: type[ModuleBase[Any]], **kwargs: Any) -> None: self.load_blueprint(module_class.blueprint(**kwargs)) + def unload_module(self, module_class: type[ModuleBase]) -> None: + """Stop and tear down a single deployed module. + + Removes the module from coordinator state, stops its worker-side + instance, and shuts down the worker process if it becomes empty. + Stream transports and other modules' references are left intact — + callers that expect the module to come back (e.g. ``restart_module``) + are responsible for rewiring. + """ + module_class = self._resolve_class(module_class) + if module_class not in self._deployed_modules: + raise ValueError(f"{module_class.__name__} is not deployed") + if module_class.deployment != "python": + raise NotImplementedError( + f"unload_module only supports python deployment, got {module_class.deployment!r}" + ) + + proxy = self._deployed_modules[module_class] + + try: + proxy.stop() + except Exception: + logger.error( + "Error stopping module during unload", + module=module_class.__name__, + exc_info=True, + ) + + python_wm = cast("WorkerManagerPython", self._managers["python"]) + try: + python_wm.undeploy(proxy) + except Exception: + logger.error( + "Error undeploying module from worker", + module=module_class.__name__, + exc_info=True, + ) + + del self._deployed_modules[module_class] + self._deployed_atoms.pop(module_class, None) + self._module_transports.pop(module_class, None) + self._class_aliases = { + k: v for k, v in self._class_aliases.items() if v is not module_class + } + self._resolved_module_refs = { + key: target + for key, target in self._resolved_module_refs.items() + if key[0] is not module_class and target is not module_class + } + + def restart_module( + self, + module_class: type[ModuleBase], + *, + reload_source: bool = True, + ) -> ModuleProxyProtocol: + """Restart a single deployed module in place. + + Unloads *module_class*, optionally reloads its source file via + ``importlib.reload`` so edited code is picked up, then redeploys it + onto a fresh worker process, reconnects its streams to the existing + transports, and re-injects the new proxy into every other module that + held a reference to it. + """ + module_class = self._resolve_class(module_class) + if module_class not in self._deployed_modules: + raise ValueError(f"{module_class.__name__} is not deployed") + if module_class.deployment != "python": + raise NotImplementedError( + f"restart_module only supports python deployment, got {module_class.deployment!r}" + ) + + old_atom = self._deployed_atoms[module_class] + kwargs = dict(old_atom.kwargs) + saved_transports = dict(self._module_transports.get(module_class, {})) + inbound_refs = [ + (consumer, ref_name) + for (consumer, ref_name), target in self._resolved_module_refs.items() + if target is module_class + ] + outbound_refs = [ + (ref_name, target) + for (consumer, ref_name), target in self._resolved_module_refs.items() + if consumer is module_class + ] + + self.unload_module(module_class) + + if reload_source: + source_mod = sys.modules.get(module_class.__module__) + if source_mod is None: + source_mod = importlib.import_module(module_class.__module__) + importlib.reload(source_mod) + new_class = cast("type[ModuleBase]", getattr(source_mod, module_class.__name__)) + else: + new_class = module_class + + if new_class is not module_class: + for old_cls in list(self._class_aliases): + if self._class_aliases[old_cls] is module_class: + self._class_aliases[old_cls] = new_class + self._class_aliases[module_class] = new_class + + python_wm = cast("WorkerManagerPython", self._managers["python"]) + new_proxy = python_wm.deploy_fresh(new_class, self._global_config, kwargs) + self._deployed_modules[new_class] = new_proxy + + new_bp = new_class.blueprint(**kwargs) + new_atom = new_bp.active_blueprints[0] + self._deployed_atoms[new_class] = new_atom + + for stream_ref in new_atom.streams: + transport = saved_transports.get(stream_ref.name) + if transport is not None: + new_proxy.set_transport(stream_ref.name, transport) + self._module_transports[new_class] = { + s.name: t for s in new_atom.streams if (t := saved_transports.get(s.name)) is not None + } + + for consumer_class, ref_name in inbound_refs: + consumer_proxy = self._deployed_modules.get(consumer_class) + if consumer_proxy is None: + continue + setattr(consumer_proxy, ref_name, new_proxy) + consumer_proxy.set_module_ref(ref_name, new_proxy) # type: ignore[attr-defined] + self._resolved_module_refs[consumer_class, ref_name] = new_class + + for ref_name, target_class in outbound_refs: + target_proxy = self._deployed_modules.get(target_class) + if target_proxy is None: + continue + setattr(new_proxy, ref_name, target_proxy) + new_proxy.set_module_ref(ref_name, target_proxy) # type: ignore[attr-defined] + self._resolved_module_refs[new_class, ref_name] = target_class + + new_proxy.build() + new_proxy.start() + + self._send_on_system_modules() + + return new_proxy + def loop(self) -> None: stop = threading.Event() try: @@ -433,6 +584,9 @@ def _deploy_all_modules( module_coordinator.deploy_parallel(module_specs) + for bp in blueprint.active_blueprints: + module_coordinator._deployed_atoms[bp.module] = bp + def _ref_msg(module_name: str, ref: object, spec_name: str, detail: str) -> str: return ( @@ -578,6 +732,9 @@ def _connect_module_refs( target_instance = module_coordinator.get_instance(target_module) # type: ignore[type-var,arg-type] setattr(base_instance, ref_name, target_instance) base_instance.set_module_ref(ref_name, target_instance) + module_coordinator._resolved_module_refs[base_module, ref_name] = cast( + "type[ModuleBase]", target_module + ) for (base_module, ref_name), proxy in disabled_ref_proxies.items(): base_instance = module_coordinator.get_instance(base_module) diff --git a/dimos/core/coordination/python_worker.py b/dimos/core/coordination/python_worker.py index 4d8445a590..4d51b6d774 100644 --- a/dimos/core/coordination/python_worker.py +++ b/dimos/core/coordination/python_worker.py @@ -237,6 +237,20 @@ def deploy_module( finally: self._reserved = max(0, self._reserved - 1) + def undeploy_module(self, module_id: int) -> None: + """Stop and remove a single module from the worker process.""" + if self._conn is None: + raise RuntimeError("Worker process not started") + + with self._lock: + self._conn.send({"type": "undeploy_module", "module_id": module_id}) + response = self._conn.recv() + + if response.get("error"): + raise RuntimeError(f"Failed to undeploy module: {response['error']}") + + self._modules.pop(module_id, None) + def suppress_console(self) -> None: if self._conn is None: return @@ -366,6 +380,13 @@ def _worker_loop(conn: Connection, instances: dict[int, Any], worker_id: int) -> result = method(*request.get("args", ()), **request.get("kwargs", {})) response["result"] = result + elif req_type == "undeploy_module": + module_id = request["module_id"] + instance = instances.pop(module_id, None) + if instance is not None: + instance.stop() + response["result"] = True + elif req_type == "suppress_console": _suppress_console_output() response["result"] = True diff --git a/dimos/core/coordination/test_module_coordinator.py b/dimos/core/coordination/test_module_coordinator.py index 73215c6726..028f85efc3 100644 --- a/dimos/core/coordination/test_module_coordinator.py +++ b/dimos/core/coordination/test_module_coordinator.py @@ -31,6 +31,7 @@ _verify_no_conflicts_with_existing, _verify_no_name_conflicts, ) +from dimos.core.coordination.worker_manager_python import WorkerManagerPython from dimos.core.core import rpc from dimos.core.global_config import GlobalConfig from dimos.core.module import Module @@ -601,3 +602,197 @@ def test_check_requirements_failure(mocker) -> None: with pytest.raises(SystemExit): _check_requirements(bp) + + +@pytest.mark.slow +def test_restart_module_basic(dynamic_coordinator) -> None: + """restart_module replaces the deployed proxy with a fresh one.""" + dynamic_coordinator.load_module(ModuleA) + old_proxy = dynamic_coordinator.get_instance(ModuleA) + assert old_proxy is not None + + new_proxy = dynamic_coordinator.restart_module(ModuleA, reload_source=False) + + assert new_proxy is not None + assert new_proxy is not old_proxy + assert dynamic_coordinator.get_instance(ModuleA) is new_proxy + assert new_proxy.get_name() == "A, Module A" + + +@pytest.mark.slow +def test_restart_module_preserves_stream_wiring(dynamic_coordinator) -> None: + """Streams stay on the same transport after restart so consumers keep receiving data.""" + dynamic_coordinator.load_blueprint(autoconnect(ModuleA.blueprint(), ModuleC.blueprint())) + + c = dynamic_coordinator.get_instance(ModuleC) + assert c is not None + topic_before = c.data3.transport.topic + registry_before = dynamic_coordinator._transport_registry[("data3", Data3)] + + dynamic_coordinator.restart_module(ModuleC, reload_source=False) + + # Transport in the registry is the same parent-side object. + assert dynamic_coordinator._transport_registry[("data3", Data3)] is registry_before + + c_after = dynamic_coordinator.get_instance(ModuleC) + assert c_after is not None + assert c_after is not c + # The restarted module's stream is wired to the same topic. + assert c_after.data3.transport.topic == topic_before + + +@pytest.mark.slow +def test_restart_module_rewires_module_refs(dynamic_coordinator) -> None: + """After restart, modules that reference the restarted class see the new proxy.""" + dynamic_coordinator.load_blueprint(autoconnect(ModuleA.blueprint(), ModuleB.blueprint())) + + b = dynamic_coordinator.get_instance(ModuleB) + assert b is not None + assert b.what_is_as_name() == "A, Module A" + + dynamic_coordinator.restart_module(ModuleA, reload_source=False) + + assert b.what_is_as_name() == "A, Module A" + + +@pytest.mark.slow +def test_restart_consumer_rewires_outbound_refs(dynamic_coordinator) -> None: + """Restarting a consumer re-injects its refs to existing target modules.""" + dynamic_coordinator.load_blueprint(autoconnect(ModuleA.blueprint(), ModuleB.blueprint())) + + dynamic_coordinator.restart_module(ModuleB, reload_source=False) + + b_after = dynamic_coordinator.get_instance(ModuleB) + assert b_after is not None + # The new ModuleB must still reach ModuleA through its outbound module_ref. + assert b_after.what_is_as_name() == "A, Module A" + + +@pytest.mark.slow +def test_restart_module_shuts_down_empty_worker(dynamic_coordinator) -> None: + """Restart shuts down the old worker (when empty) and spawns a new one.""" + + dynamic_coordinator.load_module(ModuleA) + python_wm = dynamic_coordinator._managers["python"] + assert isinstance(python_wm, WorkerManagerPython) + + old_worker_ids = {w.worker_id for w in python_wm.workers} + assert len(old_worker_ids) == 1 + + dynamic_coordinator.restart_module(ModuleA, reload_source=False) + + new_worker_ids = {w.worker_id for w in python_wm.workers} + assert len(new_worker_ids) == 1 + assert new_worker_ids.isdisjoint(old_worker_ids) + + +@pytest.mark.slow +def test_restart_module_calls_importlib_reload(dynamic_coordinator, mocker) -> None: + """reload_source=True invokes importlib.reload on the module's source file.""" + dynamic_coordinator.load_module(ModuleA) + + # Stub reload so it's a no-op. Actually reloading this test module would + # re-execute test definitions and corrupt later tests. + mock_reload = mocker.patch( + "dimos.core.coordination.module_coordinator.importlib.reload", + side_effect=lambda m: m, + ) + + dynamic_coordinator.restart_module(ModuleA, reload_source=True) + + mock_reload.assert_called_once() + reloaded_module = mock_reload.call_args.args[0] + assert reloaded_module.__name__ == ModuleA.__module__ + + +def _mock_reload_producing_new_class(original_class): + """Return a reload side-effect that replaces the original class with a fresh copy.""" + new_class = type( + original_class.__name__, original_class.__bases__, dict(original_class.__dict__) + ) + new_class.__module__ = original_class.__module__ + new_class.__qualname__ = original_class.__qualname__ + + def side_effect(mod): + setattr(mod, original_class.__name__, new_class) + return mod + + return side_effect, new_class + + +@pytest.mark.slow +def test_get_instance_after_reload_restart(dynamic_coordinator, mocker) -> None: + """get_instance with the original class still works after a reload restart.""" + dynamic_coordinator.load_module(ModuleA) + + side_effect, _new_class = _mock_reload_producing_new_class(ModuleA) + mocker.patch( + "dimos.core.coordination.module_coordinator.importlib.reload", + side_effect=side_effect, + ) + + new_proxy = dynamic_coordinator.restart_module(ModuleA, reload_source=True) + + assert dynamic_coordinator.get_instance(ModuleA) is new_proxy + + +@pytest.mark.slow +def test_double_restart_with_reload(dynamic_coordinator, mocker) -> None: + """A second restart via the original class works after a reload restart.""" + dynamic_coordinator.load_module(ModuleA) + + side_effect1, new_class1 = _mock_reload_producing_new_class(ModuleA) + mocker.patch( + "dimos.core.coordination.module_coordinator.importlib.reload", + side_effect=side_effect1, + ) + proxy1 = dynamic_coordinator.restart_module(ModuleA, reload_source=True) + + side_effect2, _new_class2 = _mock_reload_producing_new_class(new_class1) + mocker.patch( + "dimos.core.coordination.module_coordinator.importlib.reload", + side_effect=side_effect2, + ) + proxy2 = dynamic_coordinator.restart_module(ModuleA, reload_source=True) + + assert proxy2 is not proxy1 + assert dynamic_coordinator.get_instance(ModuleA) is proxy2 + + +@pytest.mark.slow +def test_unload_after_reload_restart(dynamic_coordinator, mocker) -> None: + """unload_module with the original class works after a reload restart.""" + dynamic_coordinator.load_module(ModuleA) + + side_effect, _new_class = _mock_reload_producing_new_class(ModuleA) + mocker.patch( + "dimos.core.coordination.module_coordinator.importlib.reload", + side_effect=side_effect, + ) + dynamic_coordinator.restart_module(ModuleA, reload_source=True) + + dynamic_coordinator.unload_module(ModuleA) + assert dynamic_coordinator.get_instance(ModuleA) is None + + +@pytest.mark.slow +def test_restart_preserves_remapped_streams(dynamic_coordinator) -> None: + """Restart reconnects streams that were remapped during initial load.""" + bp = autoconnect( + SourceModule.blueprint(), + TargetModule.blueprint(), + ).remappings( + [(SourceModule, "color_image", "remapped_data")], + ) + dynamic_coordinator.load_blueprint(bp) + + target = dynamic_coordinator.get_instance(TargetModule) + registry_before = dynamic_coordinator._transport_registry[("remapped_data", Data1)] + + dynamic_coordinator.restart_module(SourceModule, reload_source=False) + + # The coordinator-side transport object in the registry is unchanged. + assert dynamic_coordinator._transport_registry[("remapped_data", Data1)] is registry_before + # The restarted proxy sees the same topic as the target. + source_after = dynamic_coordinator.get_instance(SourceModule) + assert source_after.color_image.transport.topic == target.remapped_data.transport.topic diff --git a/dimos/core/coordination/test_module_reloading.py b/dimos/core/coordination/test_module_reloading.py new file mode 100644 index 0000000000..02e0b7b088 --- /dev/null +++ b/dimos/core/coordination/test_module_reloading.py @@ -0,0 +1,112 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +import subprocess +import threading +import time + +import pytest + +from dimos.core.transport import pLCMTransport + +_TIMEOUT = 30 + + +@pytest.fixture +def repl(): + proc = subprocess.Popen( + ["python3", "-i"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + + def _pump(): + for line in proc.stdout: + print(line, end="") + + t = threading.Thread(target=_pump, daemon=True) + t.start() + yield proc + proc.stdin.write("exit()\n") + proc.stdin.flush() + proc.wait(_TIMEOUT) + t.join(_TIMEOUT) + + +@pytest.fixture +def greeting(): + transport = pLCMTransport("/greetings") + transport.start() + yield transport + transport.stop() + + +@pytest.fixture +def response(): + transport = pLCMTransport("/response") + transport.start() + yield transport + transport.stop() + + +def _create_event(transport, expected) -> threading.Event: + event = threading.Event() + + def handler(msg): + if msg == expected: + event.set() + + transport.subscribe(handler) + + return event + + +def _wait_for_response(greeting, response, expected) -> None: + event = _create_event(response, expected) + deadline = time.time() + _TIMEOUT + while not event.is_set() and time.time() < deadline: + greeting.publish("John") + event.wait(0.5) + assert event.is_set(), f"Did not receive {expected!r} within timeout" + + +@pytest.fixture +def test_module_file(): + path = Path(__file__).parent / "_test_module.py" + original = path.read_text() + yield path + path.write_text(original) + + +@pytest.mark.slow +def test_module_reloading(repl, greeting, response, test_module_file): + repl.stdin.write(""" +from dimos.core.coordination.module_coordinator import ModuleCoordinator +from dimos.core.coordination import _test_module +mc = ModuleCoordinator.build(_test_module.AliceModule.blueprint()) +""") + repl.stdin.flush() + + _wait_for_response(greeting, response, "Hello John from Alice") + + test_module_file.write_text(test_module_file.read_text().replace("from Alice", "from Bob")) + + repl.stdin.write("mc.restart_module(_test_module.AliceModule)\n") + repl.stdin.flush() + + _wait_for_response(greeting, response, "Hello John from Bob") diff --git a/dimos/core/coordination/worker_manager_python.py b/dimos/core/coordination/worker_manager_python.py index a2aebcc0f0..0747997898 100644 --- a/dimos/core/coordination/worker_manager_python.py +++ b/dimos/core/coordination/worker_manager_python.py @@ -88,6 +88,52 @@ def deploy( actor = worker.deploy_module(module_class, global_config, kwargs=kwargs) return RPCClient(actor, module_class) + def deploy_fresh( + self, + module_class: type[ModuleBase], + global_config: GlobalConfig, + kwargs: dict[str, Any], + ) -> ModuleProxyProtocol: + """Spawn a brand-new worker process and deploy *module_class* on it. + + Used by restart so the new module is imported by a Python process with + a clean ``sys.modules`` — existing workers would reuse the old class + object even after ``importlib.reload`` in the parent. + """ + if self._closed: + raise RuntimeError("WorkerManager is closed") + if not self._started: + self.start() + + worker = PythonWorker() + worker.start_process() + self._workers.append(worker) + self._n_workers += 1 + actor = worker.deploy_module(module_class, global_config, kwargs=kwargs) + return RPCClient(actor, module_class) + + def undeploy(self, proxy: ModuleProxyProtocol) -> None: + """Undeploy a module and shut down its worker if it is now empty.""" + actor = getattr(proxy, "actor_instance", None) + if actor is None: + raise ValueError("Proxy has no actor_instance. Cannot undeploy.") + + module_id = actor._module_id + target: PythonWorker | None = None + for worker in self._workers: + if module_id in worker._modules: + target = worker + break + if target is None: + raise ValueError(f"No worker holds module_id={module_id}") + + target.undeploy_module(module_id) + + if not target._modules: + target.shutdown() + self._workers.remove(target) + self._n_workers = max(0, self._n_workers - 1) + def deploy_parallel(self, specs: list[ModuleSpec]) -> list[ModuleProxyProtocol]: if self._closed: raise RuntimeError("WorkerManager is closed") diff --git a/dimos/simulation/unity/test_unity_sim.py b/dimos/simulation/unity/test_unity_sim.py index 1e7dfea8b2..3227785e3c 100644 --- a/dimos/simulation/unity/test_unity_sim.py +++ b/dimos/simulation/unity/test_unity_sim.py @@ -45,9 +45,6 @@ _has_display = bool(os.environ.get("DISPLAY")) -# Helpers - - class _MockTransport: def __init__(self): self._messages = [] @@ -128,9 +125,6 @@ def _recv_tcp(sock) -> tuple[str, bytes]: return d, buf -# Config & Platform — fast, runs everywhere - - class TestConfig: def test_default_config(self): cfg = UnityBridgeConfig() @@ -159,9 +153,6 @@ def test_rejects_unsupported_platform(self): _validate_platform() -# ROS1 Deserialization — fast, runs everywhere - - class TestROS1Deserialization: def test_pointcloud2_round_trip(self): pts = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=np.float32) @@ -209,10 +200,8 @@ def test_serialize_pose_stamped_round_trip(self): assert r.f64() == pytest.approx(1.0) # qw -# TCP Bridge — needs sockets, ~1s, runs everywhere - - class TestTCPBridge: + @pytest.mark.slow def test_handshake_and_data_flow(self): """Mock Unity connects, sends a PointCloud2, verifies bridge publishes it.""" port = _find_free_port() @@ -245,9 +234,6 @@ def test_handshake_and_data_flow(self): np.testing.assert_allclose(received_pts, pts, atol=0.01) -# Kinematic Sim — needs threading, ~1s, runs everywhere - - class TestKinematicSim: def test_odometry_published(self): m = UnityBridgeModule(unity_binary="", sim_rate=100.0) @@ -276,9 +262,6 @@ def test_cmd_vel_moves_robot(self): assert last_odom.x == pytest.approx(1.0, abs=0.01) -# Rerun Config — fast, runs everywhere - - class TestRerunConfig: def test_static_pinhole_returns_list(self): import rerun as rr @@ -291,10 +274,6 @@ def test_suppress_returns_none(self): assert UnityBridgeModule.rerun_suppress_camera_info(None) is None -# Live Unity — slow, requires Linux x86_64 + DISPLAY -# These are skipped in CI and on unsupported platforms. - - @pytest.mark.slow @pytest.mark.skipif(not _is_linux_x86, reason="Unity binary requires Linux x86_64") @pytest.mark.skipif(not _has_display, reason="Unity requires DISPLAY (X11)") diff --git a/docs/usage/modules.md b/docs/usage/modules.md index 1ad4a3a9b9..8c16fa8561 100644 --- a/docs/usage/modules.md +++ b/docs/usage/modules.md @@ -165,6 +165,29 @@ For this, we use `dimos.core` and DimOS transport protocols. Defining message exchange protocols and message types also gives us the ability to write models in faster languages. +## Restarting a module + +While iterating on a module it's often convenient to edit its source file +and pick up the changes without tearing down the whole coordinator. The +`restart_module` call stops a single deployed module, reloads its source +via `importlib.reload`, then redeploys it onto a fresh worker process while +keeping its stream transports and reconnecting any other modules that held +a reference to it. + +```python +from dimos.core.coordination.module_coordinator import ModuleCoordinator +from dimos.core.global_config import GlobalConfig +from dimos.hardware.sensors.camera.module import CameraModule + +coordinator = ModuleCoordinator(g=GlobalConfig(n_workers=0, viewer="none")) +coordinator.start() +coordinator.load_module(CameraModule) + +# ... edit CameraModule source on disk ... + +coordinator.restart_module(CameraModule) +``` + ## Blueprints A blueprint is a predefined structure of interconnected modules. You can include blueprints or modules in new blueprints. diff --git a/pyproject.toml b/pyproject.toml index a6842bd10b..364fc6d22b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -423,6 +423,7 @@ module = [ "ultralytics.*", "unitree_webrtc_connect.*", "xarm.*", + "ament_index_python.*", ] ignore_missing_imports = true