diff --git a/src/epomakercontroller/commands/EpomakerWirelessInitCommand.py b/src/epomakercontroller/commands/EpomakerWirelessInitCommand.py new file mode 100644 index 0000000..be83aa2 --- /dev/null +++ b/src/epomakercontroller/commands/EpomakerWirelessInitCommand.py @@ -0,0 +1,52 @@ +""" +Todo: write docstrings for this class +""" + +from .EpomakerCommand import EpomakerCommand, CommandStructure +from .reports.Report import Report + + +class EpomakerWirelessInitCommand(EpomakerCommand): + CHUNKS = [ + "f60a", + "8f00000000000070", + "fc", + "8700000000000078", + "fc", + "800000000000007f", + "fc", + "ad00000000000052", + "fc", + "840000000000007b", + "fc", + "850000000000007a", + "fc", + "8700000000000078", + "fc", + "8600000000000079", + "fc", + "910000000000006e", + "fc", + "920000000000006d", + "fc", + "9700000000000068" + "fc", + ] + + def __init__(self): + initialization_data = "fe40" + initial_report = Report(initialization_data, index=0, checksum_index=None) + structure = CommandStructure( + number_of_starter_reports=1, + number_of_data_reports=len(self.CHUNKS), + number_of_footer_reports=0, + ) + super().__init__(initial_report, structure) + + def prepare_from_sequence(self) -> None: + for report_index in range(0, self.structure.number_of_data_reports): + chunk = self.CHUNKS[report_index] + report = Report(chunk, index=report_index + self.structure.number_of_starter_reports, checksum_index=None) + self._insert_report(report) + + self.report_data_prepared = True diff --git a/src/epomakercontroller/configs/constants.py b/src/epomakercontroller/configs/constants.py index 046d1b0..0f9fe02 100644 --- a/src/epomakercontroller/configs/constants.py +++ b/src/epomakercontroller/configs/constants.py @@ -16,7 +16,7 @@ if not os.path.exists(ROOT_FOLDER): os.mkdir(ROOT_FOLDER) -TMP_FOLDER = os.path.abspath(ROOT_FOLDER + "tmp/") +TMP_FOLDER = os.path.abspath("./.epomaker_controller") ETC_FOLDER = os.path.abspath(ROOT_FOLDER + "etc/") # Create folder on Windows @@ -26,8 +26,8 @@ if not os.path.exists(ETC_FOLDER): os.mkdir(ETC_FOLDER) -RULE_FILE_PATH = ETC_FOLDER + "udev/rules.d/99-epomaker-rt100.rules" -TMP_FILE_PATH = TMP_FOLDER + "99-epomaker-rt100.rules" +RULE_FILE_PATH = ETC_FOLDER + "/udev/rules.d/99-epomaker-rt100.rules" +TMP_FILE_PATH = TMP_FOLDER + "/99-epomaker-rt100.rules" PATH_TO_DEFAULT_CONFIG = "src/epomakercontroller/configs/default.json" DAEMON_TIME_DELAY = 1.6 diff --git a/src/epomakercontroller/epomakercontroller.py b/src/epomakercontroller/epomakercontroller.py index 7347f11..1d3b357 100644 --- a/src/epomakercontroller/epomakercontroller.py +++ b/src/epomakercontroller/epomakercontroller.py @@ -16,6 +16,7 @@ from datetime import datetime from json import dumps +from .commands.EpomakerWirelessInitCommand import EpomakerWirelessInitCommand from .configs.constants import TMP_FILE_PATH, RULE_FILE_PATH from .logger.logger import Logger from .utils.sensors import get_cpu_usage, get_device_temp @@ -132,13 +133,12 @@ def open_device(self, only_info: bool = False) -> bool: if only_info: return True - # Find the device with the specified interface number so we can open by path - # This way we don't block usage of the keyboard whilst the device is open - device_path = self._find_device_path() - if device_path is None: - return False + self._open_device(product_id) + + if self.config.use_wireless and self.device: + Logger.log_info("Sending wireless initialization sequence command") + self.send_wireless_init() - self._open_device(device_path) return self.device is not None @override @@ -165,14 +165,14 @@ def _find_product_id(self) -> Optional[int]: return None - def _open_device(self, device_path: bytes) -> None: + def _open_device(self, product_id: int) -> None: """Opens the USB HID device. Args: - device_path (bytes): The path to the device. + product_id (int): The product ID. """ try: - self.device.open_path(device_path) + self.device.open(self.config.vendor_id, product_id) except IOError as e: Logger.log_error( f"Failed to open device: {e}\n" @@ -236,25 +236,6 @@ def print_device_info(self) -> None: ) ) - def _find_device_path(self) -> Optional[bytes]: - """Finds the device path with the specified interface number. - - Returns: - Optional[bytes]: The device path if found, None otherwise. - """ - input_dir = "/sys/class/input" - hid_infos = EpomakerController._get_hid_infos( - input_dir, self.config.device_description - ) - - if not hid_infos: - Logger.log_warning(f"No events found with description: '{self.config.device_description}'") - return None - - EpomakerController._populate_hid_paths(hid_infos) - - return self._select_device_path(hid_infos) - @staticmethod def _get_hid_infos(input_dir: str, description: str) -> list[HIDInfo]: """Retrieve HID information based on the given description.""" @@ -304,7 +285,7 @@ def _select_device_path(self, hid_infos: list[HIDInfo]) -> Optional[bytes]: ) def _send_command( - self, command: EpomakerCommand.EpomakerCommand, sleep_time: float = 0.1, + self, command: EpomakerCommand.EpomakerCommand, sleep_time: float = 1 / 1000, poll_first: bool = False ) -> None: """Sends a command to the HID device. @@ -338,7 +319,7 @@ def _send_command( # We need to give some time for the screen to process out report # Otherwise it will hang processing queue # Not the best way to do it tho, but at least it works... - with TimeHelper(min_duration=EpomakerController.COMMAND_MIN_DELAY): + with TimeHelper(min_duration=sleep_time): self.device.send_feature_report(packet.get_all_bytes()) @staticmethod @@ -356,6 +337,14 @@ def _check_range(value: int, r: range | None = None) -> bool: r = range(0, 100) # 0 to 99 return value in r + def send_wireless_init(self): + """ + Sends wireless init command to the HID device. Required before 2.4GHz mode usage + """ + command = EpomakerWirelessInitCommand() + command.prepare_from_sequence() + self._send_command(command, poll_first=True) + def send_image(self, image_path: str) -> None: """Sends an image to the HID device. @@ -407,7 +396,6 @@ def send_temperature(self, temperature: int | None) -> None: Logger.log_info(f"Sending temperature {temperature}C") self._send_command(temperature_command) - def send_cpu(self, cpu: int) -> None: """Sends the CPU percentage to the HID device. diff --git a/tests/fake/fake_controller.py b/tests/fake/fake_controller.py index c9821e4..a7bc050 100644 --- a/tests/fake/fake_controller.py +++ b/tests/fake/fake_controller.py @@ -1,7 +1,8 @@ from __future__ import annotations + import typing -from epomakercontroller.commands.EpomakerCommand import EpomakerCommand +from epomakercontroller.commands.EpomakerPollCommand import EpomakerPollCommand from epomakercontroller.epomakercontroller import EpomakerConfig, EpomakerController if typing.TYPE_CHECKING: @@ -31,5 +32,16 @@ def open_device(self, only_info: bool = False): def close_device(self): self.is_ready = False - def _send_command(self, command: EpomakerCommand): + def poll(self): + poll_command = EpomakerPollCommand() + self._send_command(poll_command, poll_first=False) + + def _send_command( + self, + command, + sleep_time: float = 1 / 1000, + poll_first: bool = False + ) -> None: + if poll_first: + self.poll() self.commands.append(command) diff --git a/tests/test_wireless_init_command.py b/tests/test_wireless_init_command.py new file mode 100644 index 0000000..23799f2 --- /dev/null +++ b/tests/test_wireless_init_command.py @@ -0,0 +1,29 @@ +from epomakercontroller.commands.EpomakerPollCommand import EpomakerPollCommand +from epomakercontroller.commands.EpomakerWirelessInitCommand import EpomakerWirelessInitCommand +from fake.fake_controller import FakeEpomakerController +import epomakercontroller.cli as cli + + +def test_wireless_command_init_data(): + controller = FakeEpomakerController(cli.CONFIG_MAIN) + controller.send_wireless_init() + + assert controller.commands + reports = controller.commands[1].reports # First will be poll command + assert reports + assert reports[0].header_format_string == "fe40" + + for index in range(1, len(reports)): + assert EpomakerWirelessInitCommand.CHUNKS[index - 1] == reports[index].header_format_string + + +def test_wireless_poll(): + controller = FakeEpomakerController(cli.CONFIG_MAIN) + controller.send_wireless_init() + + assert controller.commands + controller.commands.pop(1) + + for command in controller.commands: + assert isinstance(command, EpomakerPollCommand) + assert command.reports[0].header_format_string == "f7" # Poll command