Skip to content

Include initial state manager for BipedalWalker #1305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these changes

Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ vizdoom.ini

# Data generated from pytest
save_videos*/
scripts/
.vscode
119 changes: 102 additions & 17 deletions gymnasium/envs/box2d/bipedal_walker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,15 @@ class BipedalWalker(gym.Env, EzPickle):
"render_fps": FPS,
}

def __init__(self, render_mode: Optional[str] = None, hardcore: bool = False):
def __init__(
self,
render_mode: Optional[str] = None,
hardcore: bool = False,
fall_down_penalty: bool = True,
):
EzPickle.__init__(self, render_mode, hardcore)
self.isopen = True
self.fall_down_penaly = fall_down_penalty

self.world = Box2D.b2World()
self.terrain: List[Box2D.b2Body] = []
Expand Down Expand Up @@ -266,6 +272,7 @@ def __init__(self, render_mode: Optional[str] = None, hardcore: bool = False):
self.render_mode = render_mode
self.screen: Optional[pygame.Surface] = None
self.clock = None
self._terrain_metadata = {}

def _destroy(self):
if not self.terrain:
Expand All @@ -281,7 +288,80 @@ def _destroy(self):
self.legs = []
self.joints = []

def _process_terrain_metadata(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a docstring to explain what the function does

STATES = (1, 2, 3)
STUMP, STAIRS, PIT = STATES

# Defines if the terrain should be saved or copied
self._predefined_terrain = bool(self._terrain_metadata)
# Defines if the length of the grass between obstacles should be randomly distributed
self._terrain_grass_x_variation = self._terrain_metadata.get(
"x_variation", False
)
# Defines if the grass height should randomly vary
self._terrain_grass_y_variation = self._terrain_metadata.get(
"y_variation", False
)

if self._predefined_terrain:
states = self._terrain_metadata.get("states", [])

obstacles_length = []
for state_object in states:
state, metadata = state_object.values()
if state in STATES:
if state == STUMP:
obstacle_length = metadata # Stump metadata is the stump size
elif state == STAIRS:
_, stair_width, stair_steps = metadata
obstacle_length = (
stair_width * stair_steps
) # Stairs total length
elif state == PIT:
obstacle_length = 4 # Default pit x size
obstacles_length.append(obstacle_length)

# Total grass portion of the terrain
self.terrain_grass = (TERRAIN_LENGTH - sum(obstacles_length)) // len(
obstacles_length
)
else:
self.terrain_grass = TERRAIN_GRASS
self._terrain_metadata = dict(states=[])

def _generate_terrain_state(self, state: int) -> any:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring

GRASS, STUMP, STAIRS, PIT, _STATES_ = range(5)

if self._predefined_terrain:
if state == GRASS:
next_state = self._terrain_metadata["states"][0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why for GRASS do we only get the first while for the next state we pop the first?

return next_state["state"]
else:
next_state = self._terrain_metadata["states"].pop(0)
state_metadata = next_state["metadata"]

else:
if state == GRASS:
next_state = self.np_random.integers(1, _STATES_)
return next_state
elif state == STUMP:
state_metadata = self.np_random.integers(1, 3)
elif state == STAIRS:
stair_height = +1 if self.np_random.random() > 0.5 else -1
stair_width = self.np_random.integers(4, 5)
stair_steps = self.np_random.integers(3, 5)
state_metadata = (stair_height, stair_width, stair_steps)
elif state == PIT:
state_metadata = self.np_random.integers(3, 5)

state_object = dict(state=state, metadata=state_metadata)
self._terrain_metadata["states"].append(state_object)

return state_metadata

def _generate_terrain(self, hardcore):
self._process_terrain_metadata()

GRASS, STUMP, STAIRS, PIT, _STATES_ = range(5)
state = GRASS
velocity = 0.0
Expand All @@ -300,12 +380,12 @@ def _generate_terrain(self, hardcore):

if state == GRASS and not oneshot:
velocity = 0.8 * velocity + 0.01 * np.sign(TERRAIN_HEIGHT - y)
if i > TERRAIN_STARTPAD:
if self._terrain_grass_y_variation and i > TERRAIN_STARTPAD:
velocity += self.np_random.uniform(-1, 1) / SCALE # 1
y += velocity

elif state == PIT and oneshot:
counter = self.np_random.integers(3, 5)
counter = self._generate_terrain_state(state)
poly = [
(x, y),
(x + TERRAIN_STEP, y),
Expand All @@ -332,7 +412,7 @@ def _generate_terrain(self, hardcore):
y -= 4 * TERRAIN_STEP

elif state == STUMP and oneshot:
counter = self.np_random.integers(1, 3)
counter = self._generate_terrain_state(state)
poly = [
(x, y),
(x + counter * TERRAIN_STEP, y),
Expand All @@ -345,9 +425,10 @@ def _generate_terrain(self, hardcore):
self.terrain.append(t)

elif state == STAIRS and oneshot:
stair_height = +1 if self.np_random.random() > 0.5 else -1
stair_width = self.np_random.integers(4, 5)
stair_steps = self.np_random.integers(3, 5)
stair_height, stair_width, stair_steps = self._generate_terrain_state(
state
)

original_y = y
for s in range(stair_steps):
poly = [
Expand Down Expand Up @@ -383,9 +464,15 @@ def _generate_terrain(self, hardcore):
self.terrain_y.append(y)
counter -= 1
if counter == 0:
counter = self.np_random.integers(TERRAIN_GRASS / 2, TERRAIN_GRASS)
if self._terrain_grass_x_variation:
counter = self.np_random.integers(
self.terrain_grass / 2, self.terrain_grass
)
else:
counter = self.terrain_grass

if state == GRASS and hardcore:
state = self.np_random.integers(1, _STATES_)
state = self._generate_terrain_state(state)
oneshot = True
else:
state = GRASS
Expand Down Expand Up @@ -429,12 +516,7 @@ def _generate_clouds(self):
x2 = max(p[0] for p in poly)
self.cloud_poly.append((poly, x1, x2))

def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
):
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
super().reset(seed=seed)
self._destroy()
self.world.contactListener_bug_workaround = ContactDetector(self)
Expand All @@ -444,6 +526,8 @@ def reset(
self.scroll = 0.0
self.lidar_render = 0

if options:
self._terrain_metadata = options.get("metadata", {})
self._generate_terrain(self.hardcore)
self._generate_clouds()

Expand Down Expand Up @@ -603,9 +687,10 @@ def step(self, action: np.ndarray):

terminated = False
if self.game_over or pos[0] < 0:
reward = -100
if self.fall_down_penaly:
reward = -100
terminated = True
if pos[0] > (TERRAIN_LENGTH - TERRAIN_GRASS) * TERRAIN_STEP:
if pos[0] > (TERRAIN_LENGTH - self.terrain_grass) * TERRAIN_STEP:
terminated = True

if self.render_mode == "human":
Expand Down
Loading