From 9a72e36d298acc64dba972504dceebdb01513398 Mon Sep 17 00:00:00 2001 From: Alexandre Abadie Date: Tue, 10 Mar 2026 10:26:38 +0100 Subject: [PATCH 1/4] dotbot/dotbot_simulator: rework simulator threads and wheel speed model --- dotbot/dotbot_simulator.py | 240 +++++++++++++++++++++++-------------- 1 file changed, 147 insertions(+), 93 deletions(-) diff --git a/dotbot/dotbot_simulator.py b/dotbot/dotbot_simulator.py index 161ac23b..2726792c 100644 --- a/dotbot/dotbot_simulator.py +++ b/dotbot/dotbot_simulator.py @@ -23,11 +23,21 @@ from dotbot.logger import LOGGER from dotbot.protocol import PayloadDotBotAdvertisement, PayloadType -R = 25 -L = 70 +Kv = 400 # motor speed constant in RPM +r = 50 # motor reduction ratio +R = 25 # wheel radius in mm +L = 70 # distance between the two wheels in mm +MIN_PWM_TO_MOVE = 30 # minimum PWM value to overcome static friction and start moving + +# Control parameters for the automatic mode MOTOR_SPEED = 60 +ANGULAR_SPEED_GAIN = 1.2 +REDUCE_SPEED_FACTOR = 0.7 +REDUCE_SPEED_ANGLE = 25 + SIMULATOR_STEP_DELTA_T = 0.02 # 20 ms +# Battery model parameters INITIAL_BATTERY_VOLTAGE = 3000 # mV MAX_BATTERY_DURATION = 300 # 5 minutes @@ -45,6 +55,13 @@ def battery_discharge_model(time_elapsed_s: float) -> int: return voltage +def wheel_speed_from_pwm(pwm: float) -> float: + """Convert a PWM value to a wheel speed in mm/s.""" + if pwm < MIN_PWM_TO_MOVE: + return 0.0 + return pwm * R * Kv / (r * 127) + + class DotBotSimulatorMode(Enum): """Operation mode of the dotbot simulator.""" @@ -91,8 +108,8 @@ def __init__(self, settings: SimulatedDotBotSettings, tx_queue: queue.Queue): self.motor_right_error = settings.motor_right_error self.time_elapsed_s = 0 - self.v_left = 0 - self.v_right = 0 + self.pwm_left = 0 + self.pwm_right = 0 self.calibrated = settings.calibrated self.waypoint_threshold = 0 @@ -103,13 +120,15 @@ def __init__(self, settings: SimulatedDotBotSettings, tx_queue: queue.Queue): self.tx_queue = tx_queue self.queue = queue.Queue() self.advertise_thread = threading.Thread(target=self.advertise, daemon=True) + self.control_thread = threading.Thread(target=self.control_thread, daemon=True) self.rx_thread = threading.Thread(target=self.rx_frame, daemon=True) - self.main_thread = threading.Thread(target=self.run, daemon=True) + self.main_thread = threading.Thread(target=self.update_state, daemon=True) self.controller_mode: DotBotSimulatorMode = DotBotSimulatorMode.MANUAL self.logger = LOGGER.bind(context=__name__, address=self.address) self._stop_event = threading.Event() self.rx_thread.start() self.advertise_thread.start() + self.control_thread.start() self.main_thread.start() @property @@ -119,86 +138,119 @@ def header(self): source=int(self.address, 16), ) - def _diff_drive_bot(self, x_pos_old, y_pos_old, theta_old, v_right, v_left): - """Execute state space model of a rigid differential drive robot.""" - v_right_real = v_right * (1 - self.motor_right_error) - v_left_real = v_left * (1 - self.motor_left_error) - w = R * (v_left_real - v_right_real) / L - V = R * (v_left_real + v_right_real) / 2 - x_dot = V * cos(theta_old - pi) - y_dot = V * sin(theta_old - pi) - - x_pos = x_pos_old + x_dot * SIMULATOR_STEP_DELTA_T - y_pos = y_pos_old + y_dot * SIMULATOR_STEP_DELTA_T - theta = (theta_old + w * SIMULATOR_STEP_DELTA_T) % (2 * pi) - - return x_pos, y_pos, theta - - def update(self, dt: float): + def _diff_drive_model_update(self, dt: float): """State space model update.""" pos_x_old = self.pos_x pos_y_old = self.pos_y theta_old = self.theta - if self.controller_mode == DotBotSimulatorMode.MANUAL: - self.pos_x, self.pos_y, self.theta = self._diff_drive_bot( - pos_x_old, pos_y_old, theta_old, self.v_right, self.v_left - ) - elif self.controller_mode == DotBotSimulatorMode.AUTOMATIC: - delta_x = self.pos_x - self.waypoints[self.waypoint_index].pos_x - delta_y = self.pos_y - self.waypoints[self.waypoint_index].pos_y - distance_to_target = sqrt(delta_x**2 + delta_y**2) - - # check if we are close enough to the "next" waypoint - if distance_to_target < self.waypoint_threshold: - self.logger.info("Waypoint reached", waypoint_index=self.waypoint_index) - self.waypoint_index += 1 - # check if there are no more waypoints: - if self.waypoint_index >= len(self.waypoints): - self.logger.info( - "Last waypoint reached", waypoint_index=self.waypoint_index - ) - self.v_left = 0 - self.v_right = 0 - self.waypoint_index = 0 - self.controller_mode = DotBotSimulatorMode.MANUAL - else: - robot_angle = self.theta - angle_to_target = atan2(delta_y, delta_x) - if robot_angle >= pi: - robot_angle = robot_angle - 2 * pi - # if (angle_to_target < 0): - # angle_to_target = 2*pi + angle_to_target - - error_angle = ((angle_to_target - robot_angle + pi) % (2 * pi)) - pi - self.logger.debug( - "Moving to waypoint", - robot_angle=robot_angle, - angle_to_target=angle_to_target, - error_angle=error_angle, - ) + if self.pwm_left > 100: + self.pwm_left = 100 + if self.pwm_right > 100: + self.pwm_right = 100 + if self.pwm_left < 0: + self.pwm_left = 0 + if self.pwm_right < 0: + self.pwm_right = 0 + + # Compute each wheel's real speed considering the motor error and the minimum PWM to move + v_left_real = wheel_speed_from_pwm(self.pwm_left) * (1 - self.motor_left_error) + v_right_real = wheel_speed_from_pwm(self.pwm_right) * ( + 1 - self.motor_right_error + ) + + V = (v_left_real + v_right_real) / 2 + w = (v_left_real - v_right_real) / L + x_dot = V * cos(theta_old - pi) + y_dot = V * sin(theta_old - pi) + + self.pos_x = pos_x_old + x_dot * SIMULATOR_STEP_DELTA_T + self.pos_y = pos_y_old + y_dot * SIMULATOR_STEP_DELTA_T + self.theta = (theta_old + w * SIMULATOR_STEP_DELTA_T) % (2 * pi) - angular_speed = error_angle * MOTOR_SPEED - self.v_left = MOTOR_SPEED + angular_speed - self.v_right = MOTOR_SPEED - angular_speed - - if self.v_left > 100: - self.v_left = 100 - if self.v_right > 100: - self.v_right = 100 - if self.v_left < 0: - self.v_left = 0 - if self.v_right < 0: - self.v_right = 0 - - self.pos_x, self.pos_y, self.theta = self._diff_drive_bot( - self.pos_x, self.pos_y, self.theta, self.v_right, self.v_left - ) self.logger.debug( - "DotBot simulator update", x=self.pos_x, y=self.pos_y, theta=self.theta + "State updated", + pos_x=self.pos_x, + pos_y=self.pos_y, + theta=self.theta, + pwm_left=self.pwm_left, + pwm_right=self.pwm_right, + v_left_real=v_left_real, + v_right_real=v_right_real, ) self.time_elapsed_s += dt + def update_state(self): + """Update the state of the dotbot simulator.""" + while True: + with self._lock: + self._diff_drive_model_update(SIMULATOR_STEP_DELTA_T) + is_stopped = self._stop_event.wait(SIMULATOR_STEP_DELTA_T) + if is_stopped: + break + + def _compute_automatic_control(self): + if self.controller_mode != DotBotSimulatorMode.AUTOMATIC: + return + + delta_x = self.pos_x - self.waypoints[self.waypoint_index].pos_x + delta_y = self.pos_y - self.waypoints[self.waypoint_index].pos_y + distance_to_target = sqrt(delta_x**2 + delta_y**2) + + # check if we are close enough to the "next" waypoint + if distance_to_target < self.waypoint_threshold: + self.logger.info("Waypoint reached", waypoint_index=self.waypoint_index) + self.waypoint_index += 1 + # check if there are no more waypoints: + if self.waypoint_index >= len(self.waypoints): + self.logger.info( + "Last waypoint reached", waypoint_index=self.waypoint_index + ) + self.pwm_left = 0 + self.pwm_right = 0 + self.waypoint_index = 0 + self.controller_mode = DotBotSimulatorMode.MANUAL + else: + robot_angle = self.theta + angle_to_target = atan2(delta_y, delta_x) + if robot_angle >= pi: + robot_angle = robot_angle - 2 * pi + + error_angle = ((angle_to_target - robot_angle + pi) % (2 * pi)) - pi + self.logger.info( + "Moving to waypoint", + robot_angle=robot_angle, + angle_to_target=angle_to_target, + error_angle=error_angle, + ) + + speed_reduction_factor: float = 1.0 + if distance_to_target < self.waypoint_threshold * 2: + speed_reduction_factor = REDUCE_SPEED_FACTOR + + if error_angle > REDUCE_SPEED_ANGLE or error_angle < -REDUCE_SPEED_ANGLE: + speed_reduction_factor = REDUCE_SPEED_FACTOR + + angular_speed = error_angle * MOTOR_SPEED * ANGULAR_SPEED_GAIN + self.pwm_left = MOTOR_SPEED * speed_reduction_factor + angular_speed + self.pwm_right = MOTOR_SPEED * speed_reduction_factor - angular_speed + + self.logger.info( + "Control loop update", + v_left=self.pwm_left, + v_right=self.pwm_right, + theta=self.theta, + ) + + def control_thread(self): + """Control thread to update the state of the dotbot simulator.""" + while self._stop_event.is_set() is False: + with self._lock: + self._compute_automatic_control() + is_stopped = self._stop_event.wait(SIMULATOR_UPDATE_INTERVAL_S) + if is_stopped: + break + def advertise(self): """Send an advertisement message to the gateway.""" while self._stop_event.is_set() is False: @@ -231,37 +283,39 @@ def rx_frame(self): if self.address == hex(frame.header.destination)[2:]: if frame.payload_type == PayloadType.CMD_MOVE_RAW: self.controller_mode = DotBotSimulatorMode.MANUAL - self.v_left = frame.packet.payload.left_y - self.v_right = frame.packet.payload.right_y - - if self.v_left > 127: - self.v_left = self.v_left - 256 - if self.v_right > 127: - self.v_right = self.v_right - 256 + self.pwm_left = frame.packet.payload.left_y + self.pwm_right = frame.packet.payload.right_y + self.logger.info( + "RAW command received", + v_left=self.pwm_left, + v_right=self.pwm_right, + ) + + if self.pwm_left > 127: + self.pwm_left = self.pwm_left - 256 + if self.pwm_right > 127: + self.pwm_right = self.pwm_right - 256 elif frame.payload_type == PayloadType.LH2_WAYPOINTS: - self.v_left = 0 - self.v_right = 0 + self.pwm_left = 0 + self.pwm_right = 0 self.controller_mode = DotBotSimulatorMode.MANUAL self.waypoint_threshold = frame.packet.payload.threshold self.waypoints = frame.packet.payload.waypoints + self.logger.info( + "Waypoints received", + threshold=self.waypoint_threshold, + waypoints=self.waypoints, + ) if self.waypoints: self.controller_mode = DotBotSimulatorMode.AUTOMATIC - def run(self): - """Update the state of the dotbot simulator.""" - while True: - with self._lock: - self.update(SIMULATOR_UPDATE_INTERVAL_S) - is_stopped = self._stop_event.wait(SIMULATOR_UPDATE_INTERVAL_S) - if is_stopped: - break - def stop(self): self.logger.info(f"Stopping DotBot {self.address} simulator...") self._stop_event.set() self.queue.put_nowait(None) # unblock the rx_thread if waiting on the queue self.advertise_thread.join() + self.control_thread.join() self.rx_thread.join() self.main_thread.join() From a6598e26cecbe3ebd4bd1f6ce85e47a61c68992d Mon Sep 17 00:00:00 2001 From: Alexandre Abadie Date: Tue, 10 Mar 2026 13:19:38 +0100 Subject: [PATCH 2/4] dotbot/dotbot_simulator: fix pwm limit to allow negative values --- dotbot/dotbot_simulator.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dotbot/dotbot_simulator.py b/dotbot/dotbot_simulator.py index 2726792c..17860b81 100644 --- a/dotbot/dotbot_simulator.py +++ b/dotbot/dotbot_simulator.py @@ -57,7 +57,7 @@ def battery_discharge_model(time_elapsed_s: float) -> int: def wheel_speed_from_pwm(pwm: float) -> float: """Convert a PWM value to a wheel speed in mm/s.""" - if pwm < MIN_PWM_TO_MOVE: + if abs(pwm) < MIN_PWM_TO_MOVE: return 0.0 return pwm * R * Kv / (r * 127) @@ -148,10 +148,10 @@ def _diff_drive_model_update(self, dt: float): self.pwm_left = 100 if self.pwm_right > 100: self.pwm_right = 100 - if self.pwm_left < 0: - self.pwm_left = 0 - if self.pwm_right < 0: - self.pwm_right = 0 + if self.pwm_left < -100: + self.pwm_left = -100 + if self.pwm_right < -100: + self.pwm_right = -100 # Compute each wheel's real speed considering the motor error and the minimum PWM to move v_left_real = wheel_speed_from_pwm(self.pwm_left) * (1 - self.motor_left_error) From b596e1323805c4432970f7d76c6f51011a009d14 Mon Sep 17 00:00:00 2001 From: Alexandre Abadie Date: Tue, 10 Mar 2026 13:51:36 +0100 Subject: [PATCH 3/4] dotbot/dotbot_simulator: fix dt in state update and release lock after state update --- dotbot/dotbot_simulator.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dotbot/dotbot_simulator.py b/dotbot/dotbot_simulator.py index 17860b81..7f2444fe 100644 --- a/dotbot/dotbot_simulator.py +++ b/dotbot/dotbot_simulator.py @@ -138,7 +138,7 @@ def header(self): source=int(self.address, 16), ) - def _diff_drive_model_update(self, dt: float): + def _diff_drive_model_update(self): """State space model update.""" pos_x_old = self.pos_x pos_y_old = self.pos_y @@ -178,16 +178,16 @@ def _diff_drive_model_update(self, dt: float): v_left_real=v_left_real, v_right_real=v_right_real, ) - self.time_elapsed_s += dt + self.time_elapsed_s += SIMULATOR_STEP_DELTA_T def update_state(self): """Update the state of the dotbot simulator.""" while True: with self._lock: - self._diff_drive_model_update(SIMULATOR_STEP_DELTA_T) - is_stopped = self._stop_event.wait(SIMULATOR_STEP_DELTA_T) - if is_stopped: - break + self._diff_drive_model_update() + is_stopped = self._stop_event.wait(SIMULATOR_STEP_DELTA_T) + if is_stopped: + break def _compute_automatic_control(self): if self.controller_mode != DotBotSimulatorMode.AUTOMATIC: From a22c58ea11ba0b40ef2eedb0ce785381b264cb3d Mon Sep 17 00:00:00 2001 From: Alexandre Abadie Date: Tue, 10 Mar 2026 13:52:18 +0100 Subject: [PATCH 4/4] dotbot/dotbot_simulator: convert error angle to degrees before applying the speed reduction factor --- dotbot/dotbot_simulator.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dotbot/dotbot_simulator.py b/dotbot/dotbot_simulator.py index 7f2444fe..3cfb1c7f 100644 --- a/dotbot/dotbot_simulator.py +++ b/dotbot/dotbot_simulator.py @@ -228,9 +228,12 @@ def _compute_automatic_control(self): if distance_to_target < self.waypoint_threshold * 2: speed_reduction_factor = REDUCE_SPEED_FACTOR - if error_angle > REDUCE_SPEED_ANGLE or error_angle < -REDUCE_SPEED_ANGLE: + error_angle_degrees = error_angle * 180 / pi + if ( + error_angle_degrees > REDUCE_SPEED_ANGLE + or error_angle_degrees < -REDUCE_SPEED_ANGLE + ): speed_reduction_factor = REDUCE_SPEED_FACTOR - angular_speed = error_angle * MOTOR_SPEED * ANGULAR_SPEED_GAIN self.pwm_left = MOTOR_SPEED * speed_reduction_factor + angular_speed self.pwm_right = MOTOR_SPEED * speed_reduction_factor - angular_speed