Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/epomakercontroller/epomakercontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class HIDInfo:


class EpomakerController(ControllerBase):
COMMAND_MIN_DELAY = 1 / 1000 # ms.

"""EpomakerController class represents a controller for an Epomaker USB HID device.

Attributes:
Expand Down Expand Up @@ -245,7 +247,7 @@ def _find_device_path(self) -> Optional[bytes]:
)

if not hid_infos:
print(f"No events found with description: '{self.config.device_description}'")
Logger.log_warning(f"No events found with description: '{self.config.device_description}'")
return None

EpomakerController._populate_hid_paths(hid_infos)
Expand Down Expand Up @@ -277,7 +279,7 @@ def _populate_hid_paths(hid_infos: list[HIDInfo]) -> None:
for hi in hid_infos:
device_symlink = os.path.join(hi.event_path, "device")
if not os.path.islink(device_symlink):
print(f"No 'device' symlink found in {hi.event_path}")
Logger.log_warning(f"No 'device' symlink found in {hi.event_path}")
continue

hid_device_path = os.path.realpath(device_symlink)
Expand All @@ -290,7 +292,7 @@ def _select_device_path(self, hid_infos: list[HIDInfo]) -> Optional[bytes]:
filtered_devices = [h for h in hid_infos if device_name_filter in h.device_name]

if not filtered_devices:
print(f"Could not find {device_name_filter} interface")
Logger.log_warning(f"Could not find {device_name_filter} interface")
return None

selected_device = filtered_devices[0]
Expand Down Expand Up @@ -322,9 +324,13 @@ def _send_command(self, command: EpomakerCommand.EpomakerCommand) -> None:
for packet in command:
assert len(packet) == BUFF_LENGTH
if self.dry_run:
print(f"Dry run: skipping command send: {packet!r}")
Logger.log_info(f"Dry run: skipping command send: {packet!r}")
else:
self.device.send_feature_report(packet.get_all_bytes())
# We need to give some time for the screen to process out report
# Otherwise it will hang processing queue
# Not the best way to do it tho, but at least it works...
with TimeHelper(min_duration=EpomakerController.COMMAND_MIN_DELAY):
self.device.send_feature_report(packet.get_all_bytes())

@staticmethod
def _check_range(value: int, r: range | None = None) -> bool:
Expand Down Expand Up @@ -407,7 +413,7 @@ def send_cpu(self, cpu: int) -> None:
return

cpu_command = EpomakerCpuCommand.EpomakerCpuCommand(cpu)
print(f"Sending CPU {cpu}%")
Logger.log_info(f"Sending CPU {cpu}%")
self._send_command(cpu_command)

def set_rgb_all_keys(self, r: int, g: int, b: int) -> None:
Expand Down
Loading