Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,16 @@ The app has two pages accessible from a slim sidebar:
- **Scroll inversion is experimental** — uses coalesced `PostMessage` injection to avoid LL hook deadlocks; may not work perfectly in all apps
- **Admin not required** — but some games or elevated windows may not receive injected keystrokes
- **Linux app detection is still limited** — X11 works via `xdotool`, KDE Wayland works via `kdotool`, and GNOME / other Wayland compositors still fall back to the default profile
- **Linux remapping needs device permissions** — Mouser must be able to read `/dev/input/event*` and write `/dev/uinput`
- **Linux remapping needs device permissions** — Mouser must be able to read `/dev/input/event*` and write `/dev/uinput`. HID++ features (DPI, battery, Smart Shift) additionally require access to `/dev/hidraw*`, which most distros restrict to root by default. Create a udev rule file at `/etc/udev/rules.d/69-logitech-mouser.rules` with the following content:
```
# Logitech HID++ access for Mouser (USB + Bluetooth)
ACTION=="add", SUBSYSTEM=="hidraw", ATTRS{idVendor}=="046d", TAG+="uaccess"
ACTION=="add", SUBSYSTEM=="hidraw", KERNELS=="0005:046D:*", TAG+="uaccess"
```
Then reload:
```bash
sudo udevadm control --reload && sudo udevadm trigger
```

## Future Work

Expand Down
154 changes: 123 additions & 31 deletions core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self):
self._app_detector = AppDetector(self._on_app_change)
self._profile_change_cb = None # UI callback
self._connection_change_cb = None # UI callback for device status
self._status_cb = None # UI callback for status messages
self._battery_read_cb = None # UI callback for battery level
self._dpi_read_cb = None # UI callback for current DPI
self._smart_shift_read_cb = None # UI callback for Smart Shift mode
Expand All @@ -46,6 +47,13 @@ def __init__(self):
self.cfg.get("settings", {}).get("debug_mode", False)
)
self._battery_poll_stop = threading.Event()
self._battery_poll_thread = None # track the poller thread
self._last_connection_state = bool(self.hook.device_connected)
self._last_hid_features_ready = bool(self.hid_features_ready)
self._hid_replay_requested_this_launch = False
self._replay_inflight = False
self._replay_pending_rerun = False
self._replay_lock = threading.Lock()
self._lock = threading.Lock()
self.hook.set_debug_callback(self._emit_debug)
self.hook.set_gesture_callback(self._emit_gesture_event)
Expand Down Expand Up @@ -198,6 +206,10 @@ def set_debug_callback(self, cb):
"""Register ``cb(message: str)`` invoked for debug events."""
self._debug_cb = cb

def set_status_callback(self, cb):
"""Register ``cb(message: str)`` invoked for status messages."""
self._status_cb = cb

def set_gesture_event_callback(self, cb):
"""Register ``cb(event: dict)`` invoked for structured gesture debug events."""
self._gesture_event_cb = cb
Expand Down Expand Up @@ -229,6 +241,13 @@ def _emit_debug(self, message):
except Exception:
pass

def _emit_status(self, message):
if self._status_cb:
try:
self._status_cb(message)
except Exception:
pass

def _emit_gesture_event(self, event):
if not self._debug_events_enabled:
return
Expand All @@ -253,29 +272,106 @@ def _emit_mapping_snapshot(self, prefix, mappings):
summary = ", ".join(f"{key}={mappings.get(key, 'none')}" for key in interesting)
self._emit_debug(f"{prefix}: {summary}")

def _replay_saved_settings_once(self):
hg = self.hook._hid_gesture
if hg is None:
return False
if hasattr(hg, "connected_device") and hg.connected_device is None:
return False

replay_ok = True
saved_dpi = self.cfg.get("settings", {}).get("dpi")
if saved_dpi is not None:
if not hasattr(hg, "set_dpi"):
replay_ok = False
elif hg.set_dpi(saved_dpi):
if self._dpi_read_cb:
try:
self._dpi_read_cb(saved_dpi)
except Exception:
pass
else:
replay_ok = False

saved_ss = self.cfg.get("settings", {}).get("smart_shift_mode")
if saved_ss and getattr(hg, "smart_shift_supported", False):
if not hasattr(hg, "set_smart_shift"):
replay_ok = False
elif hg.set_smart_shift(saved_ss):
if self._smart_shift_read_cb:
try:
self._smart_shift_read_cb(saved_ss)
except Exception:
pass
else:
replay_ok = False
return replay_ok

def _replay_saved_settings_worker(self):
while True:
with self._replay_lock:
self._replay_pending_rerun = False
replay_ok = self._replay_saved_settings_once()
with self._replay_lock:
if self._replay_pending_rerun:
continue
self._replay_inflight = False
if not replay_ok:
self._emit_status(
"Mouse reconnected, but saved device settings could not be restored yet."
)
return

def _request_saved_settings_replay(self, *, startup_fallback=False):
with self._replay_lock:
if startup_fallback and self._hid_replay_requested_this_launch:
return
if self._replay_inflight:
self._replay_pending_rerun = True
return
self._hid_replay_requested_this_launch = True
self._replay_inflight = True
if startup_fallback:
self._emit_status("Using startup fallback to replay saved device settings")
threading.Thread(
target=self._replay_saved_settings_worker,
daemon=True,
name="SavedSettingsReplay",
).start()

def _on_connection_change(self, connected):
self._battery_poll_stop.set()
connection_changed = connected != self._last_connection_state
hid_features_ready = self.hid_features_ready
hid_features_changed = hid_features_ready != self._last_hid_features_ready
if connection_changed:
self._last_connection_state = connected
self._battery_poll_stop.set()
if self._battery_poll_thread is not None:
self._battery_poll_thread.join(timeout=5)
self._battery_poll_thread = None
self._last_hid_features_ready = hid_features_ready
if self._connection_change_cb:
try:
self._connection_change_cb(connected)
except Exception:
pass
if connected:
if connected and connection_changed:
self._battery_poll_stop = threading.Event()
threading.Thread(
self._battery_poll_thread = threading.Thread(
target=self._battery_poll_loop,
args=(self._battery_poll_stop,),
daemon=True,
name="BatteryPoll",
).start()
)
self._battery_poll_thread.start()
if hid_features_ready and hid_features_changed:
self._request_saved_settings_replay()

def _battery_poll_loop(self, stop_event):
"""Read battery on connect and refresh it periodically until disconnected."""
if stop_event.wait(1):
return
while not stop_event.is_set():
hg = self.hook._hid_gesture
if hg:
if hg and hg.connected_device is not None:
level = hg.read_battery()
if stop_event.is_set():
return
Expand All @@ -284,7 +380,10 @@ def _battery_poll_loop(self, stop_event):
self._battery_read_cb(level)
except Exception:
pass
if stop_event.wait(300):
if stop_event.wait(300):
return
continue
if stop_event.wait(1):
return

def set_battery_callback(self, cb):
Expand All @@ -308,6 +407,11 @@ def device_connected(self):
def connected_device(self):
return getattr(self.hook, "connected_device", None)

@property
def hid_features_ready(self):
hg = self.hook._hid_gesture
return hg is not None and getattr(hg, "connected_device", None) is not None

@property
def enabled(self):
return self._enabled
Expand Down Expand Up @@ -360,29 +464,14 @@ def set_enabled(self, enabled):
def start(self):
self.hook.start()
self._app_detector.start()
# Apply persisted DPI and Smart Shift to the device once HID++ is ready
def _apply_saved_settings():
import time
time.sleep(3) # give HID++ time to connect
hg = self.hook._hid_gesture
if hg:
saved_dpi = self.cfg.get("settings", {}).get("dpi")
if saved_dpi:
hg.set_dpi(saved_dpi)
if self._dpi_read_cb:
try:
self._dpi_read_cb(saved_dpi)
except Exception:
pass
saved_ss = self.cfg.get("settings", {}).get("smart_shift_mode")
if saved_ss and hg.smart_shift_supported:
hg.set_smart_shift(saved_ss)
if self._smart_shift_read_cb:
try:
self._smart_shift_read_cb(saved_ss)
except Exception:
pass
threading.Thread(target=_apply_saved_settings, daemon=True).start()
# Temporary safety-net: keep the old delayed replay path until the
# hid-ready transition path has proven out in the field.
def _startup_replay_fallback():
time.sleep(3)
if not self.hid_features_ready:
return
self._request_saved_settings_replay(startup_fallback=True)
threading.Thread(target=_startup_replay_fallback, daemon=True).start()

def set_dpi_read_callback(self, cb):
"""Register a callback ``cb(dpi_value)`` invoked when DPI is read from device."""
Expand All @@ -394,5 +483,8 @@ def set_smart_shift_read_callback(self, cb):

def stop(self):
self._battery_poll_stop.set()
if self._battery_poll_thread is not None:
self._battery_poll_thread.join(timeout=5)
self._battery_poll_thread = None
self._app_detector.stop()
self.hook.stop()
15 changes: 15 additions & 0 deletions core/hid_gesture.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,9 @@ def _try_connect(self):
getattr(device_spec, "gesture_cids", ()) or DEFAULT_GESTURE_CIDS
)
self._rawxy_enabled = False
opened_transport = None
opened_up = int(up or 0)
opened_usage = int(usage or 0)
open_attempts = []
if _BACKEND_PREFERENCE in ("auto", "hidapi") and info.get("path"):
open_attempts.append(("hidapi", info))
Expand Down Expand Up @@ -1286,6 +1289,9 @@ def _try_connect(self):
d = _HidDeviceCompat(open_info["path"])
d.set_nonblocking(False)
self._dev = d
opened_transport = open_info.get("transport") or transport
opened_up = int(open_info.get("usage_page", up) or 0)
opened_usage = int(open_info.get("usage", usage) or 0)
print(f"[HidGesture] Opened PID=0x{pid:04X} via {transport}")
break
except Exception as exc:
Expand All @@ -1298,10 +1304,12 @@ def _try_connect(self):
continue

# Try Bluetooth direct (0xFF) first, then Bolt receiver slots
reprog_found = False
for idx in (0xFF, 1, 2, 3, 4, 5, 6):
self._dev_idx = idx
fi = self._find_feature(FEAT_REPROG_V4)
if fi is not None:
reprog_found = True
self._feat_idx = fi
print(f"[HidGesture] Found REPROG_V4 @0x{fi:02X} "
f"PID=0x{pid:04X} devIdx=0x{idx:02X}")
Expand Down Expand Up @@ -1343,6 +1351,13 @@ def _try_connect(self):
)
return True
break # right device but divert failed
if not reprog_found:
print(
"[HidGesture] Opened candidate but REPROG_V4 was not found "
f"on tested devIdx values PID=0x{int(pid or 0):04X} "
f"UP=0x{opened_up:04X} usage=0x{opened_usage:04X} "
f"transport={opened_transport or '-'} source={source}"
)

# Couldn't use this interface — close and try next
try:
Expand Down
17 changes: 17 additions & 0 deletions core/logi_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,20 @@ def build_connected_device_info(
image_asset="icons/mouse-simple.svg",
gesture_cids=tuple(gesture_cids or DEFAULT_GESTURE_CIDS),
)


def build_evdev_connected_device_info(
*,
product_id=None,
product_name=None,
transport="evdev",
source="evdev",
gesture_cids=None,
) -> ConnectedDeviceInfo:
return build_connected_device_info(
product_id=product_id,
product_name=product_name,
transport=transport,
source=source,
gesture_cids=gesture_cids,
)
Loading