diff --git a/console/control_ui.py b/console/control_ui.py index 1be8d2d..2a83108 100644 --- a/console/control_ui.py +++ b/console/control_ui.py @@ -3,292 +3,293 @@ import signal import subprocess import time - - import rclpy - -from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, - QVBoxLayout, QHBoxLayout, QLineEdit, - QLabel) -from PyQt6.QtCore import Qt, pyqtSignal - +from PyQt6.QtWidgets import ( + QApplication, + QMainWindow, + QWidget, + QVBoxLayout, + QLineEdit, + QLabel, + QSizePolicy, + QSplitter, +) +from PyQt6.QtCore import Qt, pyqtSignal, QTimer from PyQt6.QtGui import QFont + from .node import AP1ConsoleNode from .command_output import CommandOutput from .diagnostics_display import DiagnosticsDisplay from .visual_path import PathCanvas def print_help(self): - self.command_output.add_line('=' * 40) - self.command_output.add_line('Welcome to AP1 Console') - self.command_output.add_line('Commands:') - self.command_output.add_line('\tspeed - Set target speed (m/s)') - self.command_output.add_line('\tlocation - Set target location (m)') - self.command_output.add_line('\tget speed_profile - Return planned speed profile (m/s)') - self.command_output.add_line('\tget planned_path - Return planning output path') - self.command_output.add_line('\techo topic [-t seconds] - Echo ROS topic') - self.command_output.add_line('\treset - Reset system and simulation (if applicable)') - self.command_output.add_line('\tclear - Clear screen') - self.command_output.add_line('\thelp - Print this screen') - self.command_output.add_line('=' * 40) - + self.command_output.add_line("=" * 40) + self.command_output.add_line("Welcome to AP1 Console") + self.command_output.add_line("Commands:") + self.command_output.add_line("\tspeed - Set target speed (m/s)") + self.command_output.add_line("\tlocation - Set target location (m)") + self.command_output.add_line("\tget speed_profile - Return planned speed profile (m/s)") + self.command_output.add_line("\tget planned_path - Return planning output path") + self.command_output.add_line("\techo topic [-t seconds] - Echo ROS topic") + self.command_output.add_line('\tstop - Stop active "echo topic"') + self.command_output.add_line("\treset - Reset system and simulation (if applicable)") + self.command_output.add_line("\tclear - Clear screen") + self.command_output.add_line("\thelp - Print this screen") + self.command_output.add_line("=" * 40) + class AP1DebugUI(QMainWindow): - # me n my homies hate writing css - # god bless chat: - # CSS_PATH = 'style.css' - - echo_output_signal = pyqtSignal(str) - - def __init__(self, node: AP1ConsoleNode, app: QApplication): - super().__init__() - self.ros_node = node - self.setWindowTitle("AP1 Console") - self.resize(1000, 700) - self.app = app - - # -- Fonts -- - # Main Header Font - header_font = QFont("Sans Serif", 16) - header_font.setBold(True) - - # Section Header Font (Bold + Underline) - section_font = QFont("Sans Serif", 10) - section_font.setBold(True) - section_font.setUnderline(True) - - # Central Widget - central_widget = QWidget() - self.setCentralWidget(central_widget) - - main_layout = QVBoxLayout(central_widget) - - # Header - header = QLabel("AP1 CONSOLE") - header.setAlignment(Qt.AlignmentFlag.AlignCenter) - header.setFont(header_font) - header.setContentsMargins(0, 0, 0, 5) - # main_layout.addWidget(header) # removing temp - - # Middle Section - middle_layout = QHBoxLayout() - - # Left Pane - left_pane = QWidget() - left_layout = QVBoxLayout(left_pane) - left_layout.setContentsMargins(0,0,0,0) - - # Diagnostics - lbl_diag = QLabel("DIAGNOSTICS") - lbl_diag.setFont(section_font) - left_layout.addWidget(lbl_diag) - - self.diagnostics = DiagnosticsDisplay(self.ros_node) - left_layout.addWidget(self.diagnostics) - - # Path Header - lbl_path = QLabel("PLANNED PATH") - lbl_path.setFont(section_font) - lbl_path.setContentsMargins(0, 10, 0, 0) - left_layout.addWidget(lbl_path) - - self.path_canvas = PathCanvas(self.ros_node) - left_layout.addWidget(self.path_canvas) - - middle_layout.addWidget(left_pane, stretch=1) - - # Right Pane: Command Output - right_pane = QWidget() - right_layout = QVBoxLayout(right_pane) - right_layout.setContentsMargins(0,0,0,0) - - lbl_cli = QLabel("COMMAND LINE INTERFACE") - lbl_cli.setStyleSheet("font-weight: bold; text-decoration: underline;") - # right_layout.addWidget(lbl_cli) # removing - - self.command_output = CommandOutput() - right_layout.addWidget(self.command_output) - - self.echo_output_signal.connect(self.command_output.add_line) - - self.active_echo_process=None - self.echo_thread=None - - middle_layout.addWidget(right_pane, stretch=1) - main_layout.addLayout(middle_layout, stretch=1) - - # Input Footer - self.command_input = QLineEdit() - self.command_input.setPlaceholderText("Command...") - self.command_input.returnPressed.connect(self.on_input_submitted) - main_layout.addWidget(self.command_input) - - # Help menu on start - print_help(self) - self.command_input.setFocus() - - def on_input_submitted(self): - cmd = self.command_input.text().strip() - - if not cmd: - return - - # Show command - self.command_output.add_line(f'> {cmd}') - self.command_input.clear() - - # Execute - self.execute_command(cmd) - - def on_input_submitted(self): - cmd = self.command_input.text().strip() - if not cmd: - return - self.command_output.add_line(f'> {cmd}') - self.command_input.clear() - self.execute_command(cmd) - - def execute_command(self, cmd: str): - try: - parts = cmd.split() - command = parts[0].lower() - - if command == 'help': - print_help(self) - - elif command == 'clear': - self.command_output.history.clear() - self.command_output.setText('') - self.command_output.add_line('History cleared') # may remove if annoying - - elif command == 'speed': - if len(parts) < 2: - self.command_output.add_line('Error! Usage: speed ') - else: - speed = float(parts[1]) # grab the value - self.ros_node.set_target_speed(speed) - self.command_output.add_line(f"✓ Target speed set to {speed:.2f} m/s") - - elif command == 'location': - if len(parts) < 3: - self.command_output.add_line('Error! Usage: location ') - else: - x, y = float(parts[1]), float(parts[2]) - self.ros_node.set_target_location(x, y) - self.command_output.add_line(f"✓ Target location set to ({x}, {y})") - - elif command == 'get': - if len(parts) < 2: - self.command_output.add_line('Error! Usage: get ') - return - - subcommand = parts[1].lower() - - if subcommand == 'speed_profile': - speed_profile = self.ros_node.speed_profile - if not speed_profile: - self.command_output.add_line('Speed profile is empty.') - return - - out = ', '.join(f'{spd} m/s' for spd in speed_profile) - self.command_output.add_line('{ ' + out + ' }') - - elif subcommand == 'planned_path': - path = self.ros_node.target_path - if not path: - self.command_output.add_line('Planned path is empty.') - return - - out = ', '.join(f'({pt.x:.2f}, {pt.y:.2f})' for pt in path) - self.command_output.add_line('{ ' + out + ' }') - - else: - self.command_output.add_line('Unknown get command.') - - elif command == 'echo': - if len(parts) < 3 or parts[1] != 'topic': - self.command_output.add_line('Usage: echo topic [-t seconds]') - return - - topic_name = parts[2] - timeout = None - - if '-t' in parts: - try: - idx = parts.index('-t') - timeout = int(parts[idx + 1]) - except: - self.command_output.add_line('Invalid timeout value.') - return - - self.command_output.add_line(f'Echoing {topic_name}... (Type "stop" to stop)') - - def run_echo(): - try: - process = subprocess.Popen( - ['ros2', 'topic', 'echo', topic_name], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True - ) - - self.active_echo_process = process - - start_time = time.time() - - for line in process.stdout: - - if process.poll() is not None: - break - - self.echo_output_signal.emit(line.strip()) - - if timeout and (time.time() - start_time > timeout): - process.terminate() - self.echo_output_signal.emit('Echo timeout reached.') - break - - except Exception as e: - self.echo_output_signal.emit(f'Error: {str(e)}') - finally: - self.active_echo_process = None - self.echo_thread = None - - self.echo_thread = threading.Thread(target=run_echo, daemon=True).start() - self.echo_thread.start() - - elif command == 'stop': - if self.active_echo_process is None: - self.command_output.add_line('No active echo process to stop') - else: - try: - self.active_echo_process.terminate() - self.active_echo_process.wait(timeout=2) - self.command_output.add_line('Echo stopped') - except: - self.active_echo_process.kill() - self.command_output.add_line('Echo KILLED') - finally: - self.active_echo_process = None - - elif command == 'reset': - self.command_output.add_line("Not yet implemented.") - - else: - self.command_output.add_line('Command not found.') - except Exception as e: - self.command_output.add_line(f"Error: {str(e)}") - - - def run(self, node: AP1ConsoleNode): - signal.signal(signal.SIGINT, signal.SIG_DFL) - - if rclpy: - spin_thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True) - spin_thread.start() - - window = self - window.show() - - sys.exit(self.app.exec()) \ No newline at end of file + echo_output_signal = pyqtSignal(str) + + def __init__(self, node: AP1ConsoleNode, app: QApplication): + super().__init__() + self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + + self.ros_node = node + self.app = app + + self.setWindowTitle("AP1 Console") + self.resize(1200, 800) + + # Fonts + header_font = QFont("Sans Serif", 16) + header_font.setBold(True) + + + section_font = QFont("Sans Serif", 10) + section_font.setBold(True) + section_font.setUnderline(True) + + # Central widget + layout + central_widget = QWidget() + self.setCentralWidget(central_widget) + main_layout = QVBoxLayout(central_widget) + + # Optional header + header = QLabel("AP1 CONSOLE") + header.setAlignment(Qt.AlignmentFlag.AlignCenter) + header.setFont(header_font) + header.setContentsMargins(0, 0, 0, 5) + # main_layout.addWidget(header) + + self.splitter = QSplitter(Qt.Orientation.Horizontal) + self.splitter.setChildrenCollapsible(False) + + + # Left pane + left_pane = QWidget() + left_pane.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + left_layout = QVBoxLayout(left_pane) + left_layout.setContentsMargins(0, 0, 0, 0) + + lbl_diag = QLabel("DIAGNOSTICS") + lbl_diag.setFont(section_font) + left_layout.addWidget(lbl_diag) + + + self.diagnostics = DiagnosticsDisplay(self.ros_node) + self.diagnostics.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Maximum) + left_layout.addWidget(self.diagnostics) + + lbl_path = QLabel("PLANNED PATH") + lbl_path.setFont(section_font) + lbl_path.setContentsMargins(0, 10, 0, 0) + left_layout.addWidget(lbl_path) + + self.path_canvas = PathCanvas(self.ros_node) + self.path_canvas.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + left_layout.addWidget(self.path_canvas, stretch=1) + + + # Right pane + right_pane = QWidget() + right_pane.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + right_layout = QVBoxLayout(right_pane) + right_layout.setContentsMargins(0, 0, 0, 0) + + self.command_output = CommandOutput() + right_layout.addWidget(self.command_output) + + self.echo_output_signal.connect(self.command_output.add_line) + + self.active_echo_process = None + self.echo_thread = None + + # Add panes to splitter + self.splitter.addWidget(left_pane) + self.splitter.addWidget(right_pane) + self.splitter.setStretchFactor(0, 1) + self.splitter.setStretchFactor(1, 1) + + main_layout.addWidget(self.splitter, stretch=1) + + #footer input + self.command_input = QLineEdit() + self.command_input.setPlaceholderText("Command...") + self.command_input.returnPressed.connect(self.on_input_submitted) + main_layout.addWidget(self.command_input) + + print_help(self) + self.command_input.setFocus() + + def on_input_submitted(self): + cmd = self.command_input.text().strip() + if not cmd: + return + + self.command_output.add_line(f"> {cmd}") + self.command_input.clear() + self.execute_command(cmd) + + def execute_command(self, cmd: str): + try: + parts = cmd.split() + command = parts[0].lower() + + if command == "help": + print_help(self) + + elif command == "clear": + self.command_output.history.clear() + self.command_output.setText("") + self.command_output.add_line("History cleared") + + elif command == "speed": + if len(parts) < 2: + self.command_output.add_line("Error! Usage: speed ") + else: + speed = float(parts[1]) + self.ros_node.set_target_speed(speed) + self.command_output.add_line(f"✓ Target speed set to {speed:.2f} m/s") + + elif command == "location": + if len(parts) < 3: + self.command_output.add_line("Error! Usage: location ") + else: + x, y = float(parts[1]), float(parts[2]) + self.ros_node.set_target_location(x, y) + self.command_output.add_line(f"✓ Target location set to ({x}, {y})") + + elif command == "get": + if len(parts) < 2: + self.command_output.add_line("Error! Usage: get ") + return + + subcommand = parts[1].lower() + + if subcommand == "speed_profile": + speed_profile = self.ros_node.speed_profile + if not speed_profile: + self.command_output.add_line("Speed profile is empty.") + return + out = ", ".join(f"{spd} m/s" for spd in speed_profile) + self.command_output.add_line("{ " + out + " }") + + + elif subcommand == "planned_path": + path = self.ros_node.target_path + if not path: + self.command_output.add_line("Planned path is empty.") + return + out = ", ".join(f"({pt.x:.2f}, {pt.y:.2f})" for pt in path) + self.command_output.add_line("{ " + out + " }") + + else: + self.command_output.add_line("Unknown get command.") + + elif command == "echo": + if len(parts) < 3 or parts[1] != "topic": + self.command_output.add_line('Usage: echo topic [-t seconds]') + return + + topic_name = parts[2] + timeout = None + + if "-t" in parts: + try: + idx = parts.index("-t") + timeout = int(parts[idx + 1]) + except Exception: + self.command_output.add_line("Invalid timeout value.") + return + + self.command_output.add_line(f'Echoing {topic_name}... (Type "stop" to stop)') + + def run_echo(): + try: + process = subprocess.Popen( + ["ros2", "topic", "echo", topic_name], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + self.active_echo_process = process + start_time = time.time() + + + for line in process.stdout: + if process.poll() is not None: + break + + + self.echo_output_signal.emit(line.rstrip()) + + + if timeout and (time.time() - start_time > timeout): + process.terminate() + self.echo_output_signal.emit("Echo timeout reached.") + break + + except Exception as e: + self.echo_output_signal.emit(f"Error: {str(e)}") + finally: + self.active_echo_process = None + self.echo_thread = None + + self.echo_thread = threading.Thread(target=run_echo, daemon=True) + self.echo_thread.start() + + elif command == "stop": + if self.active_echo_process is None: + self.command_output.add_line("No active echo process to stop") + else: + try: + self.active_echo_process.terminate() + self.active_echo_process.wait(timeout=2) + self.command_output.add_line("Echo stopped") + except Exception: + self.active_echo_process.kill() + self.command_output.add_line("Echo KILLED") + finally: + self.active_echo_process = None + + elif command == "reset": + self.command_output.add_line("Not yet implemented.") + + else: + self.command_output.add_line("Command not found.") + + except Exception as e: + self.command_output.add_line(f"Error: {str(e)}") + + def run(self, node: AP1ConsoleNode): + signal.signal(signal.SIGINT, signal.SIG_DFL) + + spin_thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True) + spin_thread.start() + + window = self + window.showMaximized() + + # Fix initial layout sizing so it stretches immediately + def finalize_layout(): + if window.centralWidget() and window.centralWidget().layout(): + window.centralWidget().layout().activate() + + w = window.width() + window.splitter.setSizes([w // 2, w // 2]) + + QTimer.singleShot(0, finalize_layout) + + sys.exit(self.app.exec()) \ No newline at end of file diff --git a/console/visual_path.py b/console/visual_path.py index c4ecfb3..c9f3cb3 100644 --- a/console/visual_path.py +++ b/console/visual_path.py @@ -1,28 +1,17 @@ """ Visualizes the car's path and surrounding features on a 2D canvas. - Coordinate conventions: - World space: +X = FORWARD, +Y = LEFT (meters) - Screen space: +X = RIGHT, +Y = DOWN (pixels) - -Transform pipeline: - world_point - → world_to_canvas() scale meters → logical canvas units, flip axes - → canvas_to_screen() scale logical units → screen pixels + World space: +X = FORWARD, +Y = LEFT (meters) + Screen space: +X = RIGHT, +Y = DOWN (pixels) """ -from PyQt6.QtWidgets import QWidget +from PyQt6.QtWidgets import QWidget, QSizePolicy from PyQt6.QtGui import QPainter, QPen, QBrush from PyQt6.QtCore import Qt, QTimer from .node import AP1ConsoleNode -# || DEFAULTS -DEFAULT_FEATURE_TYPE = ( - "stop_sign" # TEMPORARY, entities don't contain types yet so this is for now -) - -# || Colors +# Colors WHITE = Qt.GlobalColor.white DIM = Qt.GlobalColor.darkGray RED = Qt.GlobalColor.red @@ -32,322 +21,144 @@ BLUE = Qt.GlobalColor.blue BLACK = Qt.GlobalColor.black -FEATURE_COLORS = { - "stop_sign": RED, - "traffic_light": GREEN, - "stop_line": YELLOW, - "yield_sign": BLUE, -} - -# || Canvas constants -# The logical canvas is a fixed 60×60 unit grid. The car sits at the -# bottom-center: (ORIGIN_X, ORIGIN_Y) in logical units. -CANVAS_W = 60 # logical width (units) -CANVAS_H = 60 # logical height (units) -ORIGIN_X = CANVAS_W // 2 # horizontal center — car is laterally centered -ORIGIN_Y = CANVAS_H - 1 # near bottom edge — car is at the bottom - -# World extent visible on the canvas (meters) -WORLD_RANGE_X = 40 # meters forward/backward -WORLD_RANGE_Y = 40 # meters left/right - -DOT_SIZE = 6 # px – waypoint / feature marker diameter -TARGET_SIZE = 8 # px – target location marker diameter +# Canvas Constants +CANVAS_W = 60 +CANVAS_H = 60 +ORIGIN_X = CANVAS_W // 2 +ORIGIN_Y = CANVAS_H - 1 +WORLD_RANGE_X = 40 +WORLD_RANGE_Y = 40 -# || Coordinate transforms - +DOT_SIZE = 6 +# Helper Point class Point: - def __init__(self, x: float, y: float): - self.x = x - self.y = y - - -def world_to_canvas(point: Point) -> tuple[float, float]: - """ - Convert a world-space point to logical canvas coordinates. - - World convention: +X = forward (up on screen), +Y = left (left on screen) - Canvas convention: origin at bottom-center; +canvas_x = right, +canvas_y = up - - Axis mapping: - world +X (forward) → canvas -Y (up on screen, since screen Y is flipped) - world +Y (left) → canvas -X (left on screen) - - Both axes are scaled so WORLD_RANGE maps to the full canvas extent. - """ - canvas_x = -point.y / WORLD_RANGE_Y * CANVAS_W # left/right - canvas_y = -point.x / WORLD_RANGE_X * CANVAS_H # forward/back - return canvas_x, canvas_y - - -def canvas_to_screen( - cx: float, cy: float, screen_w: int, screen_h: int -) -> tuple[int, int]: - """ - Convert logical canvas coordinates to screen pixel coordinates. - - The logical canvas is CANVAS_W × CANVAS_H units, with the car origin at - (ORIGIN_X, ORIGIN_Y). This function: - 1. Shifts the point relative to the car origin. - 2. Scales to fill the current widget dimensions. - - Screen convention: origin top-left, +X right, +Y down. - Note: canvas_y is already negative for "up", so adding it to ORIGIN_Y - correctly moves points toward the top of the screen. - """ - scale_x = screen_w / CANVAS_W - scale_y = screen_h / CANVAS_H - - sx = (ORIGIN_X + cx) * scale_x - sy = (ORIGIN_Y + cy) * scale_y - return int(sx), int(sy) - - -# || Widget - - + def __init__(self, x: float, y: float): + self.x = x + self.y = y + +def canvas_to_screen(cx: float, cy: float, screen_w: int, screen_h: int): + scale_x = screen_w / CANVAS_W + scale_y = screen_h / CANVAS_H + sx = (ORIGIN_X + cx) * scale_x + sy = (ORIGIN_Y + cy) * scale_y + return int(sx), int(sy) + +# Main widget class PathCanvas(QWidget): - REFRESH_RATE = 10 # Hz - - def __init__(self, node: AP1ConsoleNode, parent=None): - super().__init__(parent) - self.node = node - - self.setMinimumSize(300, 300) - self.setStyleSheet("background: black;") - - self.timer = QTimer() - self.timer.timeout.connect(self.update) - self.timer.start(int(1000 / self.REFRESH_RATE)) - - # zoom and pan - self._zoom = 1.0 - self._pan_x = 0.0 - self._pan_y = 0.0 - self._drag_start = None - self._pan_start = None - self.setMouseTracking(False) - - def world_to_screen(self, point: Point) -> tuple[int, int]: - """ - Full pipeline: world space → logical canvas → screen pixels. - Use this for all drawing operations. - """ - effective_range_x = WORLD_RANGE_X * self._zoom - effective_range_y = WORLD_RANGE_Y * self._zoom - cx = -(point.y - self._pan_y) / effective_range_y * CANVAS_W - cy = -(point.x - self._pan_x) / effective_range_x * CANVAS_H - - return canvas_to_screen(cx, cy, self.width(), self.height()) - - def wheelEvent(self, event): - delta = event.angleDelta().y() - factor = 1.1 if delta < 0 else 0.9 # scroll up = zoom in - self._zoom = max( - 0.1, min(10.0, self._zoom * factor) - ) # min zoom is 10% max is 1000% - self.update() - - def mousePressEvent(self, event): - if event.button() == Qt.MouseButton.LeftButton: - self._drag_start = event.position() - self._pan_start = (self._pan_x, self._pan_y) - - def mouseMoveEvent(self, event): - if self._drag_start is not None: - dx = event.position().x() - self._drag_start.x() - dy = event.position().y() - self._drag_start.y() - # convert screen delte -> world delta - w, h = self.width(), self.height() - world_range_x = WORLD_RANGE_X * self._zoom - world_range_y = WORLD_RANGE_Y * self._zoom - self._pan_x = self._pan_start[0] + (dy / h) * world_range_x - self._pan_y = self._pan_start[1] + (dx / w) * world_range_y - self.update() - - def mouseReleaseEvent(self, event): - if event.button() == Qt.MouseButton.LeftButton: - self._drag_start = None - - def _draw_car(self, painter: QPainter): - """Draw a small cross/crosshair at the world origin representing the car.""" - px, py = self.world_to_screen(Point(0, 0)) - size = 8 - - pen = QPen(YELLOW) - pen.setWidth(2) - painter.setPen(pen) - painter.setBrush(Qt.BrushStyle.NoBrush) - - # Cross - painter.drawLine(px - size, py, px + size, py) - painter.drawLine(px, py - size, px, py + size) - - # Circle around it - painter.drawEllipse(px - size, py - size, size * 2, size * 2) - - # Painting - def paintEvent(self, event): - painter = QPainter(self) - painter.setRenderHint(QPainter.RenderHint.Antialiasing) - - painter.fillRect(0, 0, self.width(), self.height(), BLACK) - - self._draw_axes(painter) - self._draw_path(painter) - self._draw_features(painter) - self._draw_lanes(painter) - self._draw_car(painter) - self._draw_legend(painter) - - painter.end() - - - def _draw_legend(self, painter: QPainter): - """Draw a color key in the bottom-right corner""" - entries = [ - ("Waypoint", PURPLE, "ellipse"), - ("Path", WHITE, "line"), - ("Left lane", PURPLE, "line"), - ("Right lane", BLUE, "line"), - ("Stop sign", RED, "rect"), - ("Car", YELLOW, "rect"), - ] - - box_size = 10 - row_h = 18 - padding = 8 - legend_w = 120 - legend_h = len(entries) * row_h + padding * 2 - - lx = self.width() - legend_w - padding - ly = self.height() - legend_h - padding - - # Background - painter.setPen(Qt.PenStyle.NoPen) - painter.setBrush(QBrush(Qt.GlobalColor.black)) - painter.setOpacity(0.6) - painter.drawRect(lx - 4, ly - 4, legend_w + 8, legend_h + 8) - painter.setOpacity(1.0) - - for i, (label, color, shape) in enumerate(entries): - y = ly + padding + i * row_h - sx = lx + 4 - sy = y + row_h // 2 - - painter.setPen(Qt.PenStyle.NoPen) - painter.setBrush(QBrush(color)) - - if shape == "ellipse": - r = box_size // 2 - painter.drawEllipse(sx, sy - r, box_size, box_size) - elif shape == "rect": - painter.drawRect(sx, sy - box_size // 2, box_size, box_size) - elif shape == "line": - pen = QPen(color) - pen.setWidth(2) - painter.setPen(pen) - painter.setBrush(Qt.BrushStyle.NoBrush) - painter.drawLine(sx, sy, sx + box_size, sy) - - painter.setPen(QPen(WHITE)) - painter.setBrush(Qt.BrushStyle.NoBrush) - painter.drawText(sx + box_size + 6, sy + 4, label) - - - def _draw_axes(self, painter: QPainter): - """Draw the X (forward) and Y (left) reference axes through the car origin, with labels.""" - w = self.width() - h = self.height() - - pen = QPen(DIM) - pen.setWidth(1) - painter.setPen(pen) - - ox, oy = canvas_to_screen(0, 0, w, h) - painter.drawLine(0, oy, w, oy) # horizontal — left/right axis (+Y) - painter.drawLine(ox, 0, ox, h) # vertical — forward axis (+X) - - # Axis labels — drawn in dim gray near the positive ends of each axis - painter.setPen(QPen(DIM)) - margin = 4 # px from edge / axis line - - # +X label: top of the vertical axis (forward = up) - painter.drawText(ox + margin, margin + 12, "+x") - - # +Y label: left edge of the horizontal axis (left = left) - painter.drawText(margin, oy - margin, "+y") - - def _draw_path(self, painter: QPainter): - """Draw lines between consecutive waypoints, then waypoint dots on top.""" - waypoints = self.node.target_path[:] - - # Lines between consecutive waypoints - pen = QPen(WHITE) - pen.setWidth(2) - painter.setPen(pen) - painter.setBrush(Qt.BrushStyle.NoBrush) - - for a, b in zip(waypoints, waypoints[1:]): - x1, y1 = self.world_to_screen(a) - x2, y2 = self.world_to_screen(b) - painter.drawLine(x1, y1, x2, y2) - - # Dots at each waypoint (drawn after lines so they sit on top) - painter.setPen(Qt.PenStyle.NoPen) - painter.setBrush(QBrush(GREEN)) - r = DOT_SIZE // 2 - - for pt in waypoints: - px, py = self.world_to_screen(pt) - painter.drawEllipse(px - r, py - r, DOT_SIZE, DOT_SIZE) - - def _draw_features(self, painter: QPainter): - """Draw detected features (stop signs, traffic lights, etc.) as colored squares.""" - painter.setPen(Qt.PenStyle.NoPen) - r = DOT_SIZE // 2 - - for entity in self.node.entities: - x, y = entity.x, entity.y - feature_type = DEFAULT_FEATURE_TYPE - color = FEATURE_COLORS.get(feature_type, WHITE) - painter.setBrush(QBrush(color)) - - px, py = self.world_to_screen(Point(x, y)) - painter.drawRect(px - r, py - r, DOT_SIZE, DOT_SIZE) - - @DeprecationWarning - def _draw_target(self, painter: QPainter): - """Draw the navigation target location as a green circle.""" - if self.node.target_location is None: - return - - tx, ty = self.node.target_location - px, py = self.world_to_screen(Point(tx, ty)) - r = TARGET_SIZE // 2 - - painter.setPen(Qt.PenStyle.NoPen) - painter.setBrush(QBrush(GREEN)) - painter.drawEllipse(px - r, py - r, TARGET_SIZE, TARGET_SIZE) - - def _draw_lanes(self, painter: QPainter): - """Draw lane boundaries as polylines — left in purple, right in blue.""" - if self.node.lane is None: - return - - painter.setBrush(Qt.BrushStyle.NoBrush) - - for side, color in ( - (self.node.lane.left, PURPLE), - (self.node.lane.right, BLUE), - ): - pen = QPen(color) - pen.setWidth(2) - painter.setPen(pen) - - for p1, p2 in zip(side, side[1:]): - x1, y1 = self.world_to_screen(p1) - x2, y2 = self.world_to_screen(p2) - painter.drawLine(x1, y1, x2, y2) + REFRESH_RATE = 10 # Hz + + def __init__(self, node: AP1ConsoleNode, parent=None): + super().__init__(parent) + self.node = node + + self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + self.setMinimumSize(300, 300) + self.setStyleSheet("background: black;") + + self.timer = QTimer(self) + self.timer.timeout.connect(self.update) + self.timer.start(int(1000 / self.REFRESH_RATE)) + + self._zoom = 1.0 + self._pan_x = 0.0 + self._pan_y = 0.0 + self._drag_start = None + self._pan_start = None + + + # Coordinate transformation: World -> Screen + def world_to_screen(self, point: Point): + effective_range_x = WORLD_RANGE_X * self._zoom + effective_range_y = WORLD_RANGE_Y * self._zoom + cx = -(point.y - self._pan_y) / effective_range_y * CANVAS_W + cy = -(point.x - self._pan_x) / effective_range_x * CANVAS_H + + return canvas_to_screen(cx, cy, self.width(), self.height()) + + # zoom + def wheelEvent(self, event): + delta = event.angleDelta().y() + factor = 1.1 if delta < 0 else 0.9 + self._zoom = max(0.1, min(10.0, self._zoom * factor)) + self.update() + + # Pan + def mousePressEvent(self, event): + if event.button() == Qt.MouseButton.LeftButton: + self._drag_start = event.position() + self._pan_start = (self._pan_x, self._pan_y) + + def mouseMoveEvent(self, event): + if self._drag_start is not None: + dx = event.position().x() - self._drag_start.x() + dy = event.position().y() - self._drag_start.y() + + w, h = self.width(), self.height() + world_range_x = WORLD_RANGE_X * self._zoom + world_range_y = WORLD_RANGE_Y * self._zoom + + self._pan_x = self._pan_start[0] + (dy / h) * world_range_x + self._pan_y = self._pan_start[1] + (dx / w) * world_range_y + self.update() + + def mouseReleaseEvent(self, event): + if event.button() == Qt.MouseButton.LeftButton: + self._drag_start = None + + # Paint + def paintEvent(self, event): + painter = QPainter(self) + painter.setRenderHint(QPainter.RenderHint.Antialiasing) + + painter.fillRect(self.rect(), BLACK) + + self._draw_axes(painter) + self._draw_path(painter) + self._draw_car(painter) + + painter.end() + + # Drawing helpers + def _draw_axes(self, painter: QPainter): + pen = QPen(DIM) + pen.setWidth(1) + painter.setPen(pen) + ox, oy = canvas_to_screen(0, 0, self.width(), self.height()) + painter.drawLine(0, oy, self.width(), oy) + painter.drawLine(ox, 0, ox, self.height()) + + def _draw_car(self, painter: QPainter): + px, py = self.world_to_screen(Point(0, 0)) + size = 8 + + pen = QPen(YELLOW) + pen.setWidth(2) + painter.setPen(pen) + painter.setBrush(Qt.BrushStyle.NoBrush) + + painter.drawLine(px - size, py, px + size, py) + painter.drawLine(px, py - size, px, py + size) + painter.drawEllipse(px - size, py - size, size * 2, size * 2) + + def _draw_path(self, painter: QPainter): + waypoints = self.node.target_path[:] + + pen = QPen(WHITE) + pen.setWidth(2) + painter.setPen(pen) + painter.setBrush(Qt.BrushStyle.NoBrush) + + for a, b in zip(waypoints, waypoints[1:]): + x1, y1 = self.world_to_screen(a) + x2, y2 = self.world_to_screen(b) + painter.drawLine(x1, y1, x2, y2) + + painter.setPen(Qt.PenStyle.NoPen) + painter.setBrush(QBrush(GREEN)) + r = DOT_SIZE // 2 + + for pt in waypoints: + px, py = self.world_to_screen(pt) + painter.drawEllipse(px - r, py - r, DOT_SIZE, DOT_SIZE)