Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/epomakercontroller/commands/EpomakerPollCommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Command for polling the keyboard on 2.4Ghz wireless."""

from .EpomakerCommand import EpomakerCommand
from .reports.Report import Report


class EpomakerPollCommand(EpomakerCommand):
"""A command for polling the keyboard on 2.4Ghz wireless."""

def __init__(self) -> None:
"""Initializes the poll.
"""
initialization_data = "f7"
initial_report = Report(initialization_data, index=0, checksum_index=None)
super().__init__(initial_report)
52 changes: 52 additions & 0 deletions src/epomakercontroller/commands/EpomakerWirelessInitCommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
Todo: write docstrings for this class
"""

from .EpomakerCommand import EpomakerCommand, CommandStructure
from .reports.Report import Report


class EpomakerWirelessInitCommand(EpomakerCommand):
CHUNKS = [
"f60a",
"8f00000000000070",
"fc",
"8700000000000078",
"fc",
"800000000000007f",
"fc",
"ad00000000000052",
"fc",
"840000000000007b",
"fc",
"850000000000007a",
"fc",
"8700000000000078",
"fc",
"8600000000000079",
"fc",
"910000000000006e",
"fc",
"920000000000006d",
"fc",
"9700000000000068",
"fc",
]

def __init__(self):
initialization_data = "fe40"
initial_report = Report(initialization_data, index=0, checksum_index=None)
structure = CommandStructure(
number_of_starter_reports=1,
number_of_data_reports=len(self.CHUNKS),
number_of_footer_reports=0,
)
super().__init__(initial_report, structure)

def prepare_from_sequence(self) -> None:
for report_index in range(0, self.structure.number_of_data_reports):
chunk = self.CHUNKS[report_index]
report = Report(chunk, index=report_index + self.structure.number_of_starter_reports, checksum_index=None)
self._insert_report(report)

self.report_data_prepared = True
12 changes: 7 additions & 5 deletions src/epomakercontroller/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
if not os.path.exists(ROOT_FOLDER):
os.mkdir(ROOT_FOLDER)

TMP_FOLDER = os.path.abspath(ROOT_FOLDER + "tmp/")
TMP_FOLDER = os.path.abspath("./.epomaker_controller")
ETC_FOLDER = os.path.abspath(ROOT_FOLDER + "etc/")

# Create folder on Windows
if os.name == "nt":
if not os.path.exists(TMP_FOLDER):
os.mkdir(TMP_FOLDER)
if not os.path.exists(ETC_FOLDER):
os.mkdir(ETC_FOLDER)

RULE_FILE_PATH = ETC_FOLDER + "udev/rules.d/99-epomaker-rt100.rules"
TMP_FILE_PATH = TMP_FOLDER + "99-epomaker-rt100.rules"
# Create temp folder in project path on Linux as well
if not os.path.exists(TMP_FOLDER):
os.mkdir(TMP_FOLDER)

RULE_FILE_PATH = ETC_FOLDER + "/udev/rules.d/99-epomaker-rt100.rules"
TMP_FILE_PATH = TMP_FOLDER + "/99-epomaker-rt100.rules"
PATH_TO_DEFAULT_CONFIG = "src/epomakercontroller/configs/default.json"

DAEMON_TIME_DELAY = 1.6
126 changes: 44 additions & 82 deletions src/epomakercontroller/epomakercontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import os
import time
import subprocess
import re

from typing import override
from datetime import datetime
from json import dumps

import hid # type: ignore[import-not-found]

from .commands.EpomakerWirelessInitCommand import EpomakerWirelessInitCommand
from .configs.constants import TMP_FILE_PATH, RULE_FILE_PATH
from .logger.logger import Logger
from .utils.sensors import get_cpu_usage, get_device_temp
Expand All @@ -33,7 +33,8 @@
EpomakerCpuCommand,
EpomakerKeyRGBCommand,
EpomakerProfileCommand,
EpomakerClearScreenCommand
EpomakerClearScreenCommand,
EpomakerPollCommand,
)

from .commands.data.constants import BUFF_LENGTH, Profile
Expand Down Expand Up @@ -138,13 +139,12 @@ def open_device(self, only_info: bool = False) -> bool:
if only_info:
return True

# Find the device with the specified interface number so we can open by path
# This way we don't block usage of the keyboard whilst the device is open
device_path = self._find_device_path()
if device_path is None:
return False
self._open_device(product_id)

if self.config.use_wireless and self.device:
Logger.log_info("Sending wireless initialization sequence command")
return self.device and self.send_wireless_init()

self._open_device(device_path)
return self.device is not None

@override
Expand All @@ -163,7 +163,6 @@ def _find_product_id(self) -> Optional[int]:
int | None: The product ID if found, None otherwise.
"""

# pylint: disable=W0511
# Todo: optimization
for pid in self.config.product_ids:
self.device_list = hid.enumerate(self.config.vendor_id, pid)
Expand All @@ -172,14 +171,14 @@ def _find_product_id(self) -> Optional[int]:

return None

def _open_device(self, device_path: bytes) -> None:
def _open_device(self, product_id: int) -> None:
"""Opens the USB HID device.

Args:
device_path (bytes): The path to the device.
product_id (int): The product ID.
"""
try:
self.device.open_path(device_path)
self.device.open(self.config.vendor_id, product_id)
except IOError as e:
Logger.log_error(
f"Failed to open device: {e}\n"
Expand Down Expand Up @@ -244,78 +243,17 @@ def print_device_info(self) -> None:
)
)

def _find_device_path(self) -> Optional[bytes]:
"""Finds the device path with the specified interface number.

Returns:
Optional[bytes]: The device path if found, None otherwise.
"""
input_dir = "/sys/class/input"
hid_infos = EpomakerController._get_hid_infos(
input_dir, self.config.device_description
)

if not hid_infos:
Logger.log_warning(f"No events found with description: '{self.config.device_description}'")
return None

EpomakerController._populate_hid_paths(hid_infos)

return self._select_device_path(hid_infos)

@staticmethod
def _get_hid_infos(input_dir: str, description: str) -> list[HIDInfo]:
"""Retrieve HID information based on the given description."""
hid_infos = []
for event in os.listdir(input_dir):
if event.startswith("event"):
device_name_path = os.path.join(input_dir, event, "device", "name")
try:
with open(device_name_path, "r", encoding="utf-8") as f:
device_name = f.read().strip()
if re.search(description, device_name):
event_path = os.path.join(input_dir, event)
hid_infos.append(
HIDInfo(device_name, event_path)
)
except FileNotFoundError:
continue
return hid_infos

@staticmethod
def _populate_hid_paths(hid_infos: list[HIDInfo]) -> None:
"""Populate the HID paths for each HIDInfo object in the list."""
for hi in hid_infos:
device_symlink = os.path.join(hi.event_path, "device")
if not os.path.islink(device_symlink):
Logger.log_warning(f"No 'device' symlink found in {hi.event_path}")
continue

hid_device_path = os.path.realpath(device_symlink)
match = re.search(r"\b\d+-[\d.]+:\d+\.\d+\b", hid_device_path)
hi.hid_path = match.group(0) if match else None

def _select_device_path(self, hid_infos: list[HIDInfo]) -> Optional[bytes]:
"""Select the appropriate device path based on interface preference."""
device_name_filter = "Wireless" if self.config.use_wireless else "Wired"
filtered_devices = [h for h in hid_infos if device_name_filter in h.device_name]

if not filtered_devices:
Logger.log_warning(f"Could not find {device_name_filter} interface")
return None

selected_device = filtered_devices[0]
return (
selected_device.hid_path.encode("utf-8")
if selected_device.hid_path
else None
)

def _send_command(self, command: EpomakerCommand.EpomakerCommand) -> None:
def _send_command(
self, command: EpomakerCommand.EpomakerCommand, sleep_time: float = 1 / 1000,
poll_first: bool = False
) -> None:
"""Sends a command to the HID device.

Args:
command (EpomakerCommand): The command to send.
sleep_time (float): The time to sleep between sending packets
(default: 0.1).
poll_first(bool): Whether or not to poll the device first
"""
# Make sure device is opened and connected
if not self.device:
Expand All @@ -334,11 +272,13 @@ def _send_command(self, command: EpomakerCommand.EpomakerCommand) -> None:
assert len(packet) == BUFF_LENGTH
if self.dry_run:
Logger.log_info(f"Dry run: skipping command send: {packet!r}")
else:
elif self.device:
if poll_first:
self.poll()
# We need to give some time for the screen to process out report
# Otherwise it will hang processing queue
# Not the best way to do it tho, but at least it works...
with TimeHelper(min_duration=EpomakerController.COMMAND_MIN_DELAY):
with TimeHelper(min_duration=sleep_time):
self.device.send_feature_report(packet.get_all_bytes())

@staticmethod
Expand All @@ -356,6 +296,23 @@ def _check_range(value: int, r: range | None = None) -> bool:
r = range(0, 100) # 0 to 99
return value in r

@staticmethod
def __check_whistle_response(response: bytes):
return "01010168" in response.hex()

def send_wireless_init(self) -> bool:
"""
Sends wireless init command to the HID device. Required before 2.4GHz mode usage
"""

if self.__check_whistle_response(bytes(self.poll())):
return True

command = EpomakerWirelessInitCommand()
command.prepare_from_sequence()
self._send_command(command, poll_first=True)
return self.__check_whistle_response(bytes(self.poll()))

def send_image(self, image_path: str) -> None:
"""Sends an image to the HID device.

Expand Down Expand Up @@ -475,6 +432,11 @@ def cycle_light_modes(self, sleep_seconds: int = 5) -> None:
time.sleep(sleep_seconds)
counter += 1

def poll(self) -> Any:
poll_command = EpomakerPollCommand.EpomakerPollCommand()
self._send_command(poll_command, poll_first=False)
return self.device.get_feature_report(0x00, 128)

def set_profile(self, profile: Profile) -> None:
"""Set the keyboard profile."""
profile_command = EpomakerProfileCommand.EpomakerProfileCommand(profile)
Expand Down
17 changes: 15 additions & 2 deletions tests/fake/fake_controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

import typing

from epomakercontroller.commands.EpomakerCommand import EpomakerCommand
from epomakercontroller.commands.EpomakerPollCommand import EpomakerPollCommand
from epomakercontroller.epomakercontroller import EpomakerConfig, EpomakerController

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -31,5 +32,17 @@ def open_device(self, only_info: bool = False):
def close_device(self):
self.is_ready = False

def _send_command(self, command: EpomakerCommand):
def poll(self):
poll_command = EpomakerPollCommand()
self._send_command(poll_command, poll_first=False)
return b"01010168"

def _send_command(
self,
command,
sleep_time: float = 1 / 1000,
poll_first: bool = False
) -> None:
if poll_first:
self.poll()
self.commands.append(command)
29 changes: 29 additions & 0 deletions tests/test_wireless_init_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from epomakercontroller.commands.EpomakerPollCommand import EpomakerPollCommand
from epomakercontroller.commands.EpomakerWirelessInitCommand import EpomakerWirelessInitCommand
from fake.fake_controller import FakeEpomakerController
import epomakercontroller.cli as cli


def test_wireless_command_init_data():
controller = FakeEpomakerController(cli.CONFIG_MAIN)
controller.send_wireless_init()

assert controller.commands
reports = controller.commands[2].reports # First will be poll command
assert reports
assert reports[0].header_format_string == "fe40"

for index in range(1, len(reports)):
assert EpomakerWirelessInitCommand.CHUNKS[index - 1] == reports[index].header_format_string


def test_wireless_poll():
controller = FakeEpomakerController(cli.CONFIG_MAIN)
controller.send_wireless_init()

assert controller.commands
controller.commands.pop(2)

for command in controller.commands:
assert isinstance(command, EpomakerPollCommand)
assert command.reports[0].header_format_string == "f7" # Poll command
Loading