Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/epomakercontroller/commands/EpomakerWirelessInitCommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
Todo: write docstrings for this class
"""

from .EpomakerCommand import EpomakerCommand, CommandStructure
from .reports.Report import Report


class EpomakerWirelessInitCommand(EpomakerCommand):
CHUNKS = [
"f60a",
"8f00000000000070",
"fc",
"8700000000000078",
"fc",
"800000000000007f",
"fc",
"ad00000000000052",
"fc",
"840000000000007b",
"fc",
"850000000000007a",
"fc",
"8700000000000078",
"fc",
"8600000000000079",
"fc",
"910000000000006e",
"fc",
"920000000000006d",
"fc",
"9700000000000068"
"fc",
]

def __init__(self):
initialization_data = "fe40"
initial_report = Report(initialization_data, index=0, checksum_index=None)
structure = CommandStructure(
number_of_starter_reports=1,
number_of_data_reports=len(self.CHUNKS),
number_of_footer_reports=0,
)
super().__init__(initial_report, structure)

def prepare_from_sequence(self) -> None:
for report_index in range(0, self.structure.number_of_data_reports):
chunk = self.CHUNKS[report_index]
report = Report(chunk, index=report_index + self.structure.number_of_starter_reports, checksum_index=None)
self._insert_report(report)

self.report_data_prepared = True
6 changes: 3 additions & 3 deletions src/epomakercontroller/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
if not os.path.exists(ROOT_FOLDER):
os.mkdir(ROOT_FOLDER)

TMP_FOLDER = os.path.abspath(ROOT_FOLDER + "tmp/")
TMP_FOLDER = os.path.abspath("./.epomaker_controller")
ETC_FOLDER = os.path.abspath(ROOT_FOLDER + "etc/")

# Create folder on Windows
Expand All @@ -26,8 +26,8 @@
if not os.path.exists(ETC_FOLDER):
os.mkdir(ETC_FOLDER)

RULE_FILE_PATH = ETC_FOLDER + "udev/rules.d/99-epomaker-rt100.rules"
TMP_FILE_PATH = TMP_FOLDER + "99-epomaker-rt100.rules"
RULE_FILE_PATH = ETC_FOLDER + "/udev/rules.d/99-epomaker-rt100.rules"
TMP_FILE_PATH = TMP_FOLDER + "/99-epomaker-rt100.rules"
PATH_TO_DEFAULT_CONFIG = "src/epomakercontroller/configs/default.json"

DAEMON_TIME_DELAY = 1.6
50 changes: 19 additions & 31 deletions src/epomakercontroller/epomakercontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datetime import datetime
from json import dumps

from .commands.EpomakerWirelessInitCommand import EpomakerWirelessInitCommand
from .configs.constants import TMP_FILE_PATH, RULE_FILE_PATH
from .logger.logger import Logger
from .utils.sensors import get_cpu_usage, get_device_temp
Expand Down Expand Up @@ -132,13 +133,12 @@ def open_device(self, only_info: bool = False) -> bool:
if only_info:
return True

# Find the device with the specified interface number so we can open by path
# This way we don't block usage of the keyboard whilst the device is open
device_path = self._find_device_path()
if device_path is None:
return False
self._open_device(product_id)

if self.config.use_wireless and self.device:
Logger.log_info("Sending wireless initialization sequence command")
self.send_wireless_init()

self._open_device(device_path)
return self.device is not None

@override
Expand All @@ -165,14 +165,14 @@ def _find_product_id(self) -> Optional[int]:

return None

def _open_device(self, device_path: bytes) -> None:
def _open_device(self, product_id: int) -> None:
"""Opens the USB HID device.

Args:
device_path (bytes): The path to the device.
product_id (int): The product ID.
"""
try:
self.device.open_path(device_path)
self.device.open(self.config.vendor_id, product_id)
except IOError as e:
Logger.log_error(
f"Failed to open device: {e}\n"
Expand Down Expand Up @@ -236,25 +236,6 @@ def print_device_info(self) -> None:
)
)

def _find_device_path(self) -> Optional[bytes]:
"""Finds the device path with the specified interface number.

Returns:
Optional[bytes]: The device path if found, None otherwise.
"""
input_dir = "/sys/class/input"
hid_infos = EpomakerController._get_hid_infos(
input_dir, self.config.device_description
)

if not hid_infos:
Logger.log_warning(f"No events found with description: '{self.config.device_description}'")
return None

EpomakerController._populate_hid_paths(hid_infos)

return self._select_device_path(hid_infos)

@staticmethod
def _get_hid_infos(input_dir: str, description: str) -> list[HIDInfo]:
"""Retrieve HID information based on the given description."""
Expand Down Expand Up @@ -304,7 +285,7 @@ def _select_device_path(self, hid_infos: list[HIDInfo]) -> Optional[bytes]:
)

def _send_command(
self, command: EpomakerCommand.EpomakerCommand, sleep_time: float = 0.1,
self, command: EpomakerCommand.EpomakerCommand, sleep_time: float = 1 / 1000,
poll_first: bool = False
) -> None:
"""Sends a command to the HID device.
Expand Down Expand Up @@ -338,7 +319,7 @@ def _send_command(
# We need to give some time for the screen to process out report
# Otherwise it will hang processing queue
# Not the best way to do it tho, but at least it works...
with TimeHelper(min_duration=EpomakerController.COMMAND_MIN_DELAY):
with TimeHelper(min_duration=sleep_time):
self.device.send_feature_report(packet.get_all_bytes())

@staticmethod
Expand All @@ -356,6 +337,14 @@ def _check_range(value: int, r: range | None = None) -> bool:
r = range(0, 100) # 0 to 99
return value in r

def send_wireless_init(self):
"""
Sends wireless init command to the HID device. Required before 2.4GHz mode usage
"""
command = EpomakerWirelessInitCommand()
command.prepare_from_sequence()
self._send_command(command, poll_first=True)

def send_image(self, image_path: str) -> None:
"""Sends an image to the HID device.

Expand Down Expand Up @@ -407,7 +396,6 @@ def send_temperature(self, temperature: int | None) -> None:
Logger.log_info(f"Sending temperature {temperature}C")
self._send_command(temperature_command)


def send_cpu(self, cpu: int) -> None:
"""Sends the CPU percentage to the HID device.

Expand Down
16 changes: 14 additions & 2 deletions tests/fake/fake_controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

import typing

from epomakercontroller.commands.EpomakerCommand import EpomakerCommand
from epomakercontroller.commands.EpomakerPollCommand import EpomakerPollCommand
from epomakercontroller.epomakercontroller import EpomakerConfig, EpomakerController

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -31,5 +32,16 @@ def open_device(self, only_info: bool = False):
def close_device(self):
self.is_ready = False

def _send_command(self, command: EpomakerCommand):
def poll(self):
poll_command = EpomakerPollCommand()
self._send_command(poll_command, poll_first=False)

def _send_command(
self,
command,
sleep_time: float = 1 / 1000,
poll_first: bool = False
) -> None:
if poll_first:
self.poll()
self.commands.append(command)
29 changes: 29 additions & 0 deletions tests/test_wireless_init_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from epomakercontroller.commands.EpomakerPollCommand import EpomakerPollCommand
from epomakercontroller.commands.EpomakerWirelessInitCommand import EpomakerWirelessInitCommand
from fake.fake_controller import FakeEpomakerController
import epomakercontroller.cli as cli


def test_wireless_command_init_data():
controller = FakeEpomakerController(cli.CONFIG_MAIN)
controller.send_wireless_init()

assert controller.commands
reports = controller.commands[1].reports # First will be poll command
assert reports
assert reports[0].header_format_string == "fe40"

for index in range(1, len(reports)):
assert EpomakerWirelessInitCommand.CHUNKS[index - 1] == reports[index].header_format_string


def test_wireless_poll():
controller = FakeEpomakerController(cli.CONFIG_MAIN)
controller.send_wireless_init()

assert controller.commands
controller.commands.pop(1)

for command in controller.commands:
assert isinstance(command, EpomakerPollCommand)
assert command.reports[0].header_format_string == "f7" # Poll command