-
Notifications
You must be signed in to change notification settings - Fork 3.9k
fix(control_utils): replace pynput with readchar for Wayland-compatible keyboard listener #3130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,10 @@ | |
|
|
||
|
|
||
| import logging | ||
| import traceback | ||
| import os | ||
| import platform | ||
| import sys | ||
| import threading | ||
| from contextlib import nullcontext | ||
| from copy import copy | ||
| from functools import cache | ||
|
|
@@ -39,29 +42,25 @@ | |
| @cache | ||
| def is_headless(): | ||
| """ | ||
| Detects if the Python script is running in a headless environment (e.g., without a display). | ||
| Detects if the Python script is running in a headless environment (no display available). | ||
|
|
||
| This function attempts to import `pynput`, a library that requires a graphical environment. | ||
| If the import fails, it assumes the environment is headless. The result is cached to avoid | ||
| re-running the check. | ||
| On Linux, checks for an X11 (`DISPLAY`) or Wayland (`WAYLAND_DISPLAY`) environment variable. | ||
| On macOS and Windows, a display is always assumed to be present. The result is cached to | ||
| avoid re-running the check. | ||
|
|
||
| Returns: | ||
| True if the environment is determined to be headless, False otherwise. | ||
| """ | ||
| try: | ||
| import pynput # noqa | ||
|
|
||
| return False | ||
| except Exception: | ||
| print( | ||
| "Error trying to import pynput. Switching to headless mode. " | ||
| "As a result, the video stream from the cameras won't be shown, " | ||
| "and you won't be able to change the control flow with keyboards. " | ||
| "For more info, see traceback below.\n" | ||
| ) | ||
| traceback.print_exc() | ||
| print() | ||
| return True | ||
| if platform.system() == "Linux": | ||
| has_display = bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY")) | ||
| if not has_display: | ||
| logging.warning( | ||
| "No display detected (DISPLAY and WAYLAND_DISPLAY are unset). " | ||
| "Switching to headless mode. " | ||
| "As a result, the video stream from the cameras won't be shown." | ||
| ) | ||
| return not has_display | ||
| return False | ||
|
|
||
|
|
||
| def predict_action( | ||
|
|
@@ -119,51 +118,57 @@ def init_keyboard_listener(): | |
| """ | ||
| Initializes a non-blocking keyboard listener for real-time user interaction. | ||
|
|
||
| This function sets up a listener for specific keys (right arrow, left arrow, escape) to control | ||
| the program flow during execution, such as stopping recording or exiting loops. It gracefully | ||
| handles headless environments where keyboard listening is not possible. | ||
| Reads directly from stdin using `readchar`, which works on both X11 and Wayland sessions | ||
| without any display-server dependency. Keyboard input is unavailable when stdin is not a | ||
| TTY (e.g. piped input or a truly headless server). | ||
|
|
||
| Returns: | ||
| A tuple containing: | ||
| - The `pynput.keyboard.Listener` instance, or `None` if in a headless environment. | ||
| - A dictionary of event flags (e.g., `exit_early`) that are set by key presses. | ||
| - A ``threading.Thread`` with a ``stop()`` method, or ``None`` if stdin is not a TTY. | ||
| - A dictionary of event flags (``exit_early``, ``rerecord_episode``, ``stop_recording``) | ||
| that are set by the corresponding key presses. | ||
| """ | ||
| # Allow to exit early while recording an episode or resetting the environment, | ||
| # by tapping the right arrow key '->'. This might require a sudo permission | ||
| # to allow your terminal to monitor keyboard events. | ||
| events = {} | ||
| events["exit_early"] = False | ||
| events["rerecord_episode"] = False | ||
| events["stop_recording"] = False | ||
|
|
||
| if is_headless(): | ||
| import readchar | ||
|
|
||
| events = { | ||
| "exit_early": False, | ||
| "rerecord_episode": False, | ||
| "stop_recording": False, | ||
| } | ||
|
|
||
| if not sys.stdin.isatty(): | ||
| logging.warning( | ||
| "Headless environment detected. On-screen cameras display and keyboard inputs will not be available." | ||
| "Stdin is not a TTY. Keyboard inputs will not be available. " | ||
| "You won't be able to change the control flow with keyboard shortcuts." | ||
| ) | ||
| listener = None | ||
| return listener, events | ||
| return None, events | ||
|
|
||
| # Only import pynput if not in a headless environment | ||
| from pynput import keyboard | ||
| _stop = threading.Event() | ||
|
|
||
| def on_press(key): | ||
| try: | ||
| if key == keyboard.Key.right: | ||
| def listen(): | ||
| while not _stop.is_set(): | ||
| try: | ||
| key = readchar.readkey() | ||
| except Exception: | ||
| break | ||
| if key == readchar.key.RIGHT: | ||
| print("Right arrow key pressed. Exiting loop...") | ||
| events["exit_early"] = True | ||
| elif key == keyboard.Key.left: | ||
| print("Left arrow key pressed. Exiting loop and rerecord the last episode...") | ||
| elif key == readchar.key.LEFT: | ||
| print("Left arrow key pressed. Re-recording episode...") | ||
| events["rerecord_episode"] = True | ||
| events["exit_early"] = True | ||
| elif key == keyboard.Key.esc: | ||
| elif key == readchar.key.ESC: | ||
| print("Escape key pressed. Stopping data recording...") | ||
| events["stop_recording"] = True | ||
| events["exit_early"] = True | ||
| except Exception as e: | ||
| print(f"Error handling key press: {e}") | ||
| break | ||
| if events["stop_recording"]: | ||
| break | ||
|
|
||
| listener = keyboard.Listener(on_press=on_press) | ||
| listener = threading.Thread(target=listen, daemon=True) | ||
| listener.start() | ||
| listener.stop = _stop.set # compatibility shim: lets callers do listener.stop() | ||
|
Comment on lines
+146
to
+171
|
||
|
|
||
| return listener, events | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception handling in the listener loop swallows all errors from
readchar.readkey()and exits silently. This can make failures (e.g., stdin being closed or terminal/TTY errors) hard to diagnose. Consider logging the exception (at least at debug level) before breaking so users have some visibility into why keyboard input stopped working.