Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 88 additions & 19 deletions core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import threading
import time
from core.mouse_hook import MouseHook, MouseEvent
from core.key_simulator import ACTIONS, execute_action
from core.key_simulator import (
ACTIONS, execute_action, is_mouse_button_action,
inject_mouse_down, inject_mouse_up,
)
from core.config import (
load_config, get_active_mappings, get_profile_for_app,
BUTTON_TO_EVENTS, GESTURE_DIRECTION_BUTTONS, save_config,
Expand Down Expand Up @@ -48,6 +51,7 @@ def __init__(self):
self.cfg.get("settings", {}).get("debug_mode", False)
)
self._battery_poll_stop = threading.Event()
self._mouse_release_timers = {} # action_id → Timer for safety auto-release
self._lock = threading.Lock()
self.hook.set_debug_callback(self._emit_debug)
self.hook.set_gesture_callback(self._emit_gesture_event)
Expand Down Expand Up @@ -91,41 +95,106 @@ def _setup_hooks(self):

for btn_key, action_id in mappings.items():
events = list(BUTTON_TO_EVENTS.get(btn_key, ()))
has_up = any(e.endswith("_up") for e in events)

for evt_type in events:
if evt_type.endswith("_up"):
if action_id != "none":
self.hook.block(evt_type)
if is_mouse_button_action(action_id):
self.hook.register(evt_type, self._make_mouse_up_handler(action_id))
continue

if action_id != "none":
self.hook.block(evt_type)

if "hscroll" in evt_type:
self.hook.register(evt_type, self._make_hscroll_handler(action_id))
elif is_mouse_button_action(action_id):
if has_up:
# Button has a matching _up event → split press/release
self.hook.register(evt_type, self._make_mouse_down_handler(action_id))
else:
# Single-fire event (gesture, swipe) → full click
self.hook.register(evt_type, self._make_handler(action_id))
else:
self.hook.register(evt_type, self._make_handler(action_id))

def _make_handler(self, action_id):
def handler(event):
if self._enabled:
self._emit_debug(
f"Mapped {event.event_type} -> {action_id} "
f"({self._action_label(action_id)})"
)
if event.event_type.startswith("gesture_"):
self._emit_gesture_event({
"type": "mapped",
"event_name": event.event_type,
"action_id": action_id,
"action_label": self._action_label(action_id),
})
if action_id == "toggle_smart_shift":
self._toggle_smart_shift()
elif action_id == "switch_scroll_mode":
self._switch_scroll_mode()
else:
execute_action(action_id)
try:
if self._enabled:
print(f"[Engine] _make_handler fired: {event.event_type} -> {action_id}")
self._emit_debug(
f"Mapped {event.event_type} -> {action_id} "
f"({self._action_label(action_id)})"
)
if event.event_type.startswith("gesture_"):
self._emit_gesture_event({
"type": "mapped",
"event_name": event.event_type,
"action_id": action_id,
"action_label": self._action_label(action_id),
})
if action_id == "toggle_smart_shift":
self._toggle_smart_shift()
elif action_id == "switch_scroll_mode":
self._switch_scroll_mode()
else:
execute_action(action_id)
except Exception as exc:
print(f"[Engine] _make_handler EXCEPTION for {action_id}: {exc}")
import traceback; traceback.print_exc()
return handler

def _make_mouse_down_handler(self, action_id):
def _safety_release():
"""Auto-release if the UP event never fires."""
try:
print(f"[Engine] SAFETY RELEASE fired for {action_id} (UP never received)")
self._mouse_release_timers.pop(action_id, None)
inject_mouse_up(action_id)
except Exception as exc:
print(f"[Engine] _safety_release EXCEPTION for {action_id}: {exc}")
import traceback; traceback.print_exc()

def handler(event):
try:
if self._enabled:
print(f"[Engine] mouse_down_handler fired: {event.event_type} -> {action_id}")
self._emit_debug(
f"Mapped {event.event_type} -> {action_id} (mouse down)"
)
inject_mouse_down(action_id)
# Safety: auto-release after 2s if UP event is never received
old = self._mouse_release_timers.pop(action_id, None)
if old is not None:
old.cancel()
t = threading.Timer(2.0, _safety_release)
t.daemon = True
self._mouse_release_timers[action_id] = t
t.start()
except Exception as exc:
print(f"[Engine] mouse_down_handler EXCEPTION for {action_id}: {exc}")
import traceback; traceback.print_exc()
return handler

def _make_mouse_up_handler(self, action_id):
def handler(event):
try:
if self._enabled:
print(f"[Engine] mouse_up_handler fired: {event.event_type} -> {action_id}")
self._emit_debug(
f"Mapped {event.event_type} -> {action_id} (mouse up)"
)
# Cancel safety timer
old = self._mouse_release_timers.pop(action_id, None)
if old is not None:
old.cancel()
inject_mouse_up(action_id)
except Exception as exc:
print(f"[Engine] mouse_up_handler EXCEPTION for {action_id}: {exc}")
import traceback; traceback.print_exc()
return handler

def _toggle_smart_shift(self):
Expand Down
75 changes: 72 additions & 3 deletions core/hid_gesture.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ def __init__(self, on_down=None, on_up=None, on_move=None,
self._battery_result = None
self._last_logged_battery = None
self._connected_device_info = None
self._consecutive_request_timeouts = 0

# ── public API ────────────────────────────────────────────────

Expand Down Expand Up @@ -737,12 +738,15 @@ def _request(self, feat, func, params, timeout_ms=2000):

expected_funcs = {func, (func + 1) & 0x0F}
if r_feat == feat and r_sw == MY_SW and r_func in expected_funcs:
self._consecutive_request_timeouts = 0
return msg
# Forward non-matching reports (e.g. diverted button events) so
# button held-state tracking stays in sync during command exchanges.
self._on_report(raw)
self._consecutive_request_timeouts += 1
print(f"[HidGesture] request timeout feat=0x{feat:02X} func=0x{func:X} "
f"devIdx=0x{self._dev_idx:02X} params=[{_hex_bytes(req_params)}]")
f"devIdx=0x{self._dev_idx:02X} params=[{_hex_bytes(req_params)}] "
f"(consecutive={self._consecutive_request_timeouts})")
return None

# ── feature helpers ───────────────────────────────────────────
Expand Down Expand Up @@ -984,6 +988,7 @@ def read_dpi(self):
return self._dpi_result
time.sleep(0.1)
print("[HidGesture] DPI read timed out")
self._pending_dpi = None
return None

def _apply_pending_read_dpi(self):
Expand Down Expand Up @@ -1095,6 +1100,7 @@ def read_smart_shift(self):
return self._smart_shift_result
time.sleep(0.1)
print("[HidGesture] Smart Shift read timed out")
self._pending_smart_shift = None # prevent stale processing
return None

def _apply_pending_read_smart_shift(self):
Expand Down Expand Up @@ -1138,6 +1144,7 @@ def read_battery(self):
return self._battery_result
time.sleep(0.1)
print("[HidGesture] Battery read timed out")
self._pending_battery = None
return None

def _apply_pending_read_battery(self):
Expand Down Expand Up @@ -1187,6 +1194,32 @@ def _decode_s16(hi, lo):
value -= 0x10000
return value

def _force_release_stale_holds(self):
"""Synthesize UP events for any buttons stuck in the held state.

Called from the main loop when consecutive _rx() calls return no data,
indicating the device may have stalled or gone to sleep while a
button was physically held.
"""
if self._held:
self._held = False
print("[HidGesture] Gesture force-released (stale hold)")
if self._on_up:
try:
self._on_up()
except Exception:
pass
for info in self._extra_diverts.values():
if info["held"]:
info["held"] = False
cb = info.get("on_up")
if cb:
print("[HidGesture] Extra button force-released (stale hold)")
try:
cb()
except Exception:
pass

def _on_report(self, raw):
"""Inspect an incoming HID++ report for diverted button / raw XY events."""
msg = _parse(raw)
Expand Down Expand Up @@ -1440,11 +1473,22 @@ def _main_loop(self):
except Exception:
pass
print("[HidGesture] Listening for gesture events…")
_no_data_count = 0 # consecutive _rx() returning None
_STALE_HOLD_LIMIT = 3 # force-release held buttons after this many empty reads (~3 s)
_CONSECUTIVE_TIMEOUT_RECONNECT = 3 # force reconnect after this many request timeouts
self._consecutive_request_timeouts = 0
try:
while self._running:
if self._reconnect_requested:
self._reconnect_requested = False
raise IOError("reconnect requested")
# If too many consecutive HID++ requests timed out, the
# device likely went to sleep or power-cycled. Force a
# full reconnect so button diverts are re-applied.
if self._consecutive_request_timeouts >= _CONSECUTIVE_TIMEOUT_RECONNECT:
print(f"[HidGesture] {self._consecutive_request_timeouts} consecutive "
f"request timeouts — forcing reconnect")
raise IOError("consecutive request timeouts — device likely asleep")
# Apply any queued DPI command
if self._pending_dpi is not None:
if self._pending_dpi == "read":
Expand All @@ -1457,7 +1501,14 @@ def _main_loop(self):
self._apply_pending_read_battery()
raw = self._rx(1000)
if raw:
_no_data_count = 0
self._on_report(raw)
else:
_no_data_count += 1
# Force-release buttons stuck in held state when the
# device stops sending reports (firmware stall / sleep).
if _no_data_count >= _STALE_HOLD_LIMIT:
self._force_release_stale_holds()
except Exception as e:
print(f"[HidGesture] read error: {e}")

Expand All @@ -1475,10 +1526,28 @@ def _main_loop(self):
self._battery_idx = None
self._battery_feature_id = None
self._pending_battery = None
self._pending_dpi = None
self._pending_smart_shift = None
self._last_logged_battery = None
self._held = False
self._consecutive_request_timeouts = 0
if self._held:
self._held = False
print("[HidGesture] Gesture force-released on disconnect")
if self._on_up:
try:
self._on_up()
except Exception:
pass
for info in self._extra_diverts.values():
info["held"] = False
if info["held"]:
info["held"] = False
cb = info.get("on_up")
if cb:
print("[HidGesture] Extra button force-released on disconnect")
try:
cb()
except Exception:
pass
self._gesture_cid = DEFAULT_GESTURE_CID
self._gesture_candidates = list(DEFAULT_GESTURE_CIDS)
self._rawxy_enabled = False
Expand Down
Loading