diff --git a/pyproject.toml b/pyproject.toml index 5f45626c011..1c853572813 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -384,9 +384,9 @@ ignore_errors = false module = "lerobot.motors.*" ignore_errors = false -# [[tool.mypy.overrides]] -# module = "lerobot.robots.*" -# ignore_errors = false +[[tool.mypy.overrides]] +module = "lerobot.robots.*" +ignore_errors = false # [[tool.mypy.overrides]] # module = "lerobot.teleoperators.*" diff --git a/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py b/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py index cdf6efde146..97c1dec8157 100644 --- a/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py +++ b/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py @@ -18,10 +18,11 @@ import base64 import logging from functools import cached_property +from typing import Any import cv2 import numpy as np -import requests +import requests # type: ignore[import-untyped] # requests has no bundled type hints; install types-requests if stubs are needed from lerobot.processor import RobotAction, RobotObservation from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected @@ -83,7 +84,7 @@ def __init__(self, config: EarthRoverMiniPlusConfig): # Empty cameras dict for compatibility with recording script # Cameras are accessed directly via SDK, not through Camera objects - self.cameras = {} + self.cameras: dict[str, Any] = {} self._is_connected = False # Cache for camera frames (fallback when requests fail) diff --git a/src/lerobot/robots/lekiwi/lekiwi_client.py b/src/lerobot/robots/lekiwi/lekiwi_client.py index 1d5ea64a66c..40408d6e4c5 100644 --- a/src/lerobot/robots/lekiwi/lekiwi_client.py +++ b/src/lerobot/robots/lekiwi/lekiwi_client.py @@ -18,6 +18,7 @@ import json import logging from functools import cached_property +from typing import Any import cv2 import numpy as np @@ -53,13 +54,13 @@ def __init__(self, config: LeKiwiClientConfig): self.polling_timeout_ms = config.polling_timeout_ms self.connect_timeout_s = config.connect_timeout_s - self.zmq_context = None - self.zmq_cmd_socket = None - self.zmq_observation_socket = None + self.zmq_context: Any = None + self.zmq_cmd_socket: Any = None + self.zmq_observation_socket: Any = None - self.last_frames = {} + self.last_frames: dict[str, Any] = {} - self.last_remote_state = {} + self.last_remote_state: dict[str, Any] = {} # Define three speed levels and a current index self.speed_levels = [ @@ -70,7 +71,7 @@ def __init__(self, config: LeKiwiClientConfig): self.speed_index = 0 # Start at slow self._is_connected = False - self.logs = {} + self.logs: dict[str, Any] = {} @cached_property def _state_ft(self) -> dict[str, type]: @@ -111,7 +112,7 @@ def is_connected(self) -> bool: @property def is_calibrated(self) -> bool: - pass + return True @check_if_already_connected def connect(self) -> None: diff --git a/src/lerobot/robots/lekiwi/lekiwi_host.py b/src/lerobot/robots/lekiwi/lekiwi_host.py index ae7ac58c450..bf22bfc4d04 100644 --- a/src/lerobot/robots/lekiwi/lekiwi_host.py +++ b/src/lerobot/robots/lekiwi/lekiwi_host.py @@ -74,7 +74,7 @@ def main(cfg: LeKiwiServerConfig): try: # Business logic start = time.perf_counter() - duration = 0 + duration = 0.0 while duration < host.connection_time_s: loop_start_time = time.time() try: diff --git a/src/lerobot/robots/so_follower/robot_kinematic_processor.py b/src/lerobot/robots/so_follower/robot_kinematic_processor.py index 2aa60e12a96..7a3b1c4525f 100644 --- a/src/lerobot/robots/so_follower/robot_kinematic_processor.py +++ b/src/lerobot/robots/so_follower/robot_kinematic_processor.py @@ -77,7 +77,7 @@ def action(self, action: RobotAction) -> RobotAction: observation = self.transition.get(TransitionKey.OBSERVATION).copy() if observation is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raise ValueError("Joints observation is required for computing robot kinematics") if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA): q_raw = self.transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] @@ -94,7 +94,7 @@ def action(self, action: RobotAction) -> RobotAction: ) if q_raw is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raise ValueError("Joints observation is required for computing robot kinematics") # Current pose from FK on measured joints t_curr = self.kinematics.forward_kinematics(q_raw) @@ -287,14 +287,14 @@ def action(self, action: RobotAction) -> RobotAction: observation = self.transition.get(TransitionKey.OBSERVATION).copy() if observation is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raise ValueError("Joints observation is required for computing robot kinematics") q_raw = np.array( [float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")], dtype=float, ) if q_raw is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raise ValueError("Joints observation is required for computing robot kinematics") if self.initial_guess_current_joints: # Use current joints as initial guess self.q_curr = q_raw @@ -367,14 +367,14 @@ def action(self, action: RobotAction) -> RobotAction: gripper_vel = action.pop("ee.gripper_vel") if observation is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raise ValueError("Joints observation is required for computing robot kinematics") q_raw = np.array( [float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")], dtype=float, ) if q_raw is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raise ValueError("Joints observation is required for computing robot kinematics") if self.discrete_gripper: # Discrete gripper actions are in [0, 1, 2] @@ -554,16 +554,17 @@ def __call__(self, transition: EnvTransition) -> EnvTransition: "Missing required end-effector pose components: ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_pos must all be present in action" ) - observation = new_transition.get(TransitionKey.OBSERVATION).copy() - if observation is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raw_observation = new_transition.get(TransitionKey.OBSERVATION) + if raw_observation is None: + raise ValueError("Joints observation is required for computing robot kinematics") + observation = raw_observation.copy() q_raw = np.array( [float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")], dtype=float, ) if q_raw is None: - raise ValueError("Joints observation is require for computing robot kinematics") + raise ValueError("Joints observation is required for computing robot kinematics") if self.initial_guess_current_joints: # Use current joints as initial guess self.q_curr = q_raw diff --git a/src/lerobot/robots/unitree_g1/unitree_g1.py b/src/lerobot/robots/unitree_g1/unitree_g1.py index 41146ebe698..5b40ff106e1 100644 --- a/src/lerobot/robots/unitree_g1/unitree_g1.py +++ b/src/lerobot/robots/unitree_g1/unitree_g1.py @@ -21,7 +21,7 @@ import time from dataclasses import dataclass, field from functools import cached_property -from typing import TYPE_CHECKING, Protocol, runtime_checkable +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable import numpy as np @@ -139,12 +139,14 @@ def __init__(self, config: UnitreeG1Config): self._ChannelSubscriber = ChannelSubscriber # Initialize state variables - self.sim_env = None - self._env_wrapper = None + self.sim_env: Any = None + self._env_wrapper: Any = None self._lowstate = None self._lowstate_lock = threading.Lock() self._shutdown_event = threading.Event() - self.subscribe_thread = None + self.subscribe_thread: threading.Thread | None = None + self.kp: np.ndarray = np.empty(0, dtype=np.float32) + self.kd: np.ndarray = np.empty(0, dtype=np.float32) self.arm_ik = G1_29_ArmIK() if config.gravity_compensation else None @@ -155,7 +157,7 @@ def __init__(self, config: UnitreeG1Config): self._controller_thread = None self._controller_action_lock = threading.Lock() self.controller_input = default_remote_input() - self.controller_output = {} + self.controller_output: dict[str, Any] = {} def _subscribe_lowstate(self): # polls robot state @ 250Hz while not self._shutdown_event.is_set():