Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ lint = "pre-commit run --all-files"
test = "pytest utama_core/tests/"
main = "python -m main"
replay = "python -m utama_core.replay.replay_player"
debug-robots = "python -m utama_core.team_controller.src.debug_utils.telop_gui"
Comment thread
energy-in-joles marked this conversation as resolved.
Comment thread
energy-in-joles marked this conversation as resolved.
precommit-install = "pre-commit install && pixi run pre-commit install --hook-type pre-commit"
precommit-uninstall = "pre-commit uninstall"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,67 @@ def __init__(self, is_team_yellow: bool, n_friendly: int):
# track last kick time for each robot to transmit kick as HIGH for n timesteps after command
self._kicker_tracker: Dict[int, KickTrackerEntry] = {}

def get_robots_responses(self) -> Optional[List[RobotResponse]]:
### TODO: Not implemented yet
return None
def get_robots_responses(self) -> List[RobotResponse]:
HEADER = 0xAA
FOOTER = 0x55
MAX_PAYLOAD_LEN = 32 # Sanity limit to prevent buffer bloat
MIN_PAYLOAD_LEN = 1 # We expect at least 1 byte for ball status

if not hasattr(self, "_buffer"):
self._buffer = bytearray()

# 1. Non-blocking read: Only read if bytes are actually waiting
bytes_available = self._serial_port.in_waiting
if bytes_available > 0:
self._buffer.extend(self._serial_port.read(bytes_available))

responses = []

while True:
# 2. Look for header
if len(self._buffer) < 1:
break

if self._buffer[0] != HEADER:
self._buffer.pop(0)
continue
Comment on lines +76 to +83

# 3. Need at least header + id + length
if len(self._buffer) < 3:
break

robot_id = self._buffer[1]
length = self._buffer[2]

Comment thread
energy-in-joles marked this conversation as resolved.
# 4. Validate length byte before proceeding
if length < MIN_PAYLOAD_LEN or length > MAX_PAYLOAD_LEN:
# Malformed length; pop the header to resync
self._buffer.pop(0)
continue

packet_len = 1 + 1 + 1 + length + 1 # header + id + len + data + footer

# 5. Wait for the full packet to arrive
if len(self._buffer) < packet_len:
break

# 6. Validate footer
if self._buffer[packet_len - 1] != FOOTER:
# Corrupted packet or offset mismatch; resync
self._buffer.pop(0)
continue

# 7. Extract packet data safely
data = self._buffer[3 : 3 + length]

Comment thread
energy-in-joles marked this conversation as resolved.
# Guard against IndexError just in case, though validated by 'length' check
if len(data) >= 1:
responses.append(RobotResponse(robot_id, has_ball=(data[0] & 0x01) != 0))

Comment thread
energy-in-joles marked this conversation as resolved.
# 8. Clear parsed packet from buffer
del self._buffer[:packet_len]

return responses

def send_robot_commands(self) -> None:
"""Sends the robot commands to the appropriate team (yellow or blue)."""
Expand All @@ -68,13 +126,9 @@ def send_robot_commands(self) -> None:
# print(binary_representation)
if len(self._assigned_mapping) != self._n_friendly:
warnings.warn(
f"Only {len(self._assigned_mapping)} out of {self._n_friendly} robots have been assigned commands. Sending incomplete command packet."
f"Only {len(self._assigned_mapping)} out of {self._n_friendly} robots have been assigned commands. Sending empty commands for unassigned robots."
)
self._serial_port.write(self._out_packet)
self._serial_port.read_all()
# data_in = self._serial.read_all()
# print(data_in)
# TODO: add receiving feedback from the robots

### update kick and chip trackers. We persist the kick/chip command for KICKER_PERSIST_TIMESTEPS
### this feature is to combat packet loss and to ensure the robot does not kick within its cooldown period
Expand Down Expand Up @@ -195,20 +249,21 @@ def _generate_command_buffer(self, robot_id: int, c_command: RobotCommand) -> by
kicker_byte = 0
tracker = self._kicker_tracker.get(robot_id)

# If tracker_entry exists but persistence expired → send empty kicker byte and return
if tracker and tracker.remaining_persist <= 0:
packet.append(kicker_byte)
return packet

# Decide whether we're kicking or chipping
kick_active = c_command.kick or (tracker and tracker.remaining_persist > 0 and tracker.is_kick)

chip_active = c_command.chip or (tracker and tracker.remaining_persist > 0 and not tracker.is_kick)
if tracker:
# If a tracker exists, we are either PERSISTING or COOLING DOWN.
# We ignore raw c_command inputs until the tracker is removed.
kick_active = tracker.remaining_persist > 0 and tracker.is_kick
chip_active = tracker.remaining_persist > 0 and not tracker.is_kick
else:
# No active tracker: we are free to trigger a new action.
kick_active = c_command.kick
chip_active = c_command.chip

# Apply bitmasks based on the determined state
if kick_active:
kicker_byte |= 0xF0 # upper kicker full power
elif chip_active:
kicker_byte |= 0x0F
kicker_byte |= 0x0F # chip full power

packet.append(kicker_byte)

Expand Down
Loading
Loading