Skip to content

Include initial state manager for BipedalWalker #1305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
144 changes: 127 additions & 17 deletions gymnasium/envs/box2d/bipedal_walker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,15 @@ class BipedalWalker(gym.Env, EzPickle):
"render_fps": FPS,
}

def __init__(self, render_mode: Optional[str] = None, hardcore: bool = False):
def __init__(
self,
render_mode: Optional[str] = None,
hardcore: bool = False,
fall_down_penalty: bool = True,
):
EzPickle.__init__(self, render_mode, hardcore)
self.isopen = True
self.fall_down_penaly = fall_down_penalty

self.world = Box2D.b2World()
self.terrain: List[Box2D.b2Body] = []
Expand Down Expand Up @@ -266,6 +272,7 @@ def __init__(self, render_mode: Optional[str] = None, hardcore: bool = False):
self.render_mode = render_mode
self.screen: Optional[pygame.Surface] = None
self.clock = None
self._terrain_metadata: dict = {}

def _destroy(self):
if not self.terrain:
Expand All @@ -281,7 +288,106 @@ def _destroy(self):
self.legs = []
self.joints = []

def _process_terrain_metadata(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a docstring to explain what the function does

"""Process terrain metadata to determine terrain generation parameters.

This method sets up the terrain generation configuration based on metadata:
- Determines if terrain should be predefined or randomly generated
- Sets up grass variation parameters for x (distance) and y (height) coordinates
- Calculates grass length between obstacles for predefined terrain
- Initializes empty metadata if no predefined terrain exists
"""
STATES = (1, 2, 3)
STUMP, STAIRS, PIT = STATES

# Defines if the terrain should be saved or copied
self._predefined_terrain = bool(self._terrain_metadata)
# Defines if the length of the grass between obstacles should be randomly distributed
self._terrain_grass_x_variation = self._terrain_metadata.get(
"x_variation", False
)
# Defines if the grass height should randomly vary
self._terrain_grass_y_variation = self._terrain_metadata.get(
"y_variation", False
)

if self._predefined_terrain:
states = self._terrain_metadata.get("states", [])

obstacles_length = []
for state_object in states:
state, metadata = state_object.values()
if state in STATES:
if state == STUMP:
obstacle_length = metadata # Stump metadata is the stump size
elif state == STAIRS:
_, stair_width, stair_steps = metadata
obstacle_length = (
stair_width * stair_steps
) # Stairs total length
elif state == PIT:
obstacle_length = 4 # Default pit x size
obstacles_length.append(obstacle_length)

# Total grass portion of the terrain
self.terrain_grass = (TERRAIN_LENGTH - sum(obstacles_length)) // len(
obstacles_length
)
else:
self.terrain_grass = TERRAIN_GRASS
self._terrain_metadata = dict(states=[])

def _generate_terrain_state(self, state: int) -> any:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring

"""Generate terrain state metadata for the given terrain type.

Args:
state (int): The current terrain state (GRASS, STUMP, STAIRS, or PIT)

Returns:
any: Metadata for the terrain state:
- For GRASS: Returns next state number
- For STUMP: Returns stump size (1-2)
- For STAIRS: Returns tuple of (stair_height, stair_width, stair_steps)
- For PIT: Returns pit depth (3-4)

Note:
If using predefined terrain, returns the next state from metadata.
Otherwise generates random parameters for the terrain feature.
"""
GRASS, STUMP, STAIRS, PIT, _STATES_ = range(5)

if self._predefined_terrain:
if state == GRASS:
# Obstacles are always generated from the grass state, thus serving as a transition state
next_state = self._terrain_metadata["states"][0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why for GRASS do we only get the first while for the next state we pop the first?

Copy link
Author

@arthur-plautz arthur-plautz Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the grass state doesn't have any significant metadata to store, it can be used as a transition state, as defined in the code you highlighted.
Lines 474 - 479 define that a new state is always randomly generated from a grass state, which makes sense, since the grass is the intermediary state between obstacles. This way, when the generator enters in a grass state, it has to either retrieve the next state stored or store it, with the goal of signalizing what obstacle should be generated next. Then, when entering that obstacle state, it removes it from the state list and processes its generation in the terrain using the provided metadata.

return next_state["state"]
else:
# Within an obstacle state, retrieve obstacle metadata
next_state = self._terrain_metadata["states"].pop(0)
state_metadata = next_state["metadata"]

else:
if state == GRASS:
next_state = self.np_random.integers(1, _STATES_)
return next_state
elif state == STUMP:
state_metadata = self.np_random.integers(1, 3)
elif state == STAIRS:
stair_height = +1 if self.np_random.random() > 0.5 else -1
stair_width = self.np_random.integers(4, 5)
stair_steps = self.np_random.integers(3, 5)
state_metadata = (stair_height, stair_width, stair_steps)
elif state == PIT:
state_metadata = self.np_random.integers(3, 5)

state_object = dict(state=state, metadata=state_metadata)
self._terrain_metadata["states"].append(state_object)

return state_metadata

def _generate_terrain(self, hardcore):
self._process_terrain_metadata()

GRASS, STUMP, STAIRS, PIT, _STATES_ = range(5)
state = GRASS
velocity = 0.0
Expand All @@ -300,12 +406,12 @@ def _generate_terrain(self, hardcore):

if state == GRASS and not oneshot:
velocity = 0.8 * velocity + 0.01 * np.sign(TERRAIN_HEIGHT - y)
if i > TERRAIN_STARTPAD:
if self._terrain_grass_y_variation and i > TERRAIN_STARTPAD:
velocity += self.np_random.uniform(-1, 1) / SCALE # 1
y += velocity

elif state == PIT and oneshot:
counter = self.np_random.integers(3, 5)
counter = self._generate_terrain_state(state)
poly = [
(x, y),
(x + TERRAIN_STEP, y),
Expand All @@ -332,7 +438,7 @@ def _generate_terrain(self, hardcore):
y -= 4 * TERRAIN_STEP

elif state == STUMP and oneshot:
counter = self.np_random.integers(1, 3)
counter = self._generate_terrain_state(state)
poly = [
(x, y),
(x + counter * TERRAIN_STEP, y),
Expand All @@ -345,9 +451,10 @@ def _generate_terrain(self, hardcore):
self.terrain.append(t)

elif state == STAIRS and oneshot:
stair_height = +1 if self.np_random.random() > 0.5 else -1
stair_width = self.np_random.integers(4, 5)
stair_steps = self.np_random.integers(3, 5)
stair_height, stair_width, stair_steps = self._generate_terrain_state(
state
)

original_y = y
for s in range(stair_steps):
poly = [
Expand Down Expand Up @@ -383,9 +490,15 @@ def _generate_terrain(self, hardcore):
self.terrain_y.append(y)
counter -= 1
if counter == 0:
counter = self.np_random.integers(TERRAIN_GRASS / 2, TERRAIN_GRASS)
if self._terrain_grass_x_variation:
counter = self.np_random.integers(
self.terrain_grass / 2, self.terrain_grass
)
else:
counter = self.terrain_grass

if state == GRASS and hardcore:
state = self.np_random.integers(1, _STATES_)
state = self._generate_terrain_state(state)
oneshot = True
else:
state = GRASS
Expand Down Expand Up @@ -429,12 +542,7 @@ def _generate_clouds(self):
x2 = max(p[0] for p in poly)
self.cloud_poly.append((poly, x1, x2))

def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
):
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
super().reset(seed=seed)
self._destroy()
self.world.contactListener_bug_workaround = ContactDetector(self)
Expand All @@ -444,6 +552,7 @@ def reset(
self.scroll = 0.0
self.lidar_render = 0

self._terrain_metadata = options.get("metadata", {}) if options else {}
self._generate_terrain(self.hardcore)
self._generate_clouds()

Expand Down Expand Up @@ -603,9 +712,10 @@ def step(self, action: np.ndarray):

terminated = False
if self.game_over or pos[0] < 0:
reward = -100
if self.fall_down_penaly:
reward = -100
terminated = True
if pos[0] > (TERRAIN_LENGTH - TERRAIN_GRASS) * TERRAIN_STEP:
if pos[0] > (TERRAIN_LENGTH - self.terrain_grass) * TERRAIN_STEP:
terminated = True

if self.render_mode == "human":
Expand Down