Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/python/gymnasium_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

if __name__ == "__main__":
env = gymnasium.make(
"VizdoomHealthGatheringSupreme-v0", render_mode="human", frame_skip=4
"VizdoomHealthGatheringSupreme-v1", render_mode="human", frame_skip=4
)

# Rendering random rollouts for ten episodes
for _ in range(10):
done = False
obs, info = env.reset(seed=42)
obs, info = env.reset()
while not done:
obs, rew, terminated, truncated, info = env.step(env.action_space.sample())
env.render()
print(obs["telemetry"])
done = terminated or truncated
27 changes: 27 additions & 0 deletions examples/python/telemetry_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python3

#####################################################################
# Example for running a vizdoom scenario as a Gymnasium env
#####################################################################

import gymnasium

from vizdoom import gymnasium_wrapper # noqa
from vizdoom.gymnasium_wrapper.telemetry_obs_wrapper import TelemetryWrapper


if __name__ == "__main__":
env = gymnasium.make(
"VizdoomHealthGatheringSupreme-v1", render_mode="human", frame_skip=4
)
env = TelemetryWrapper(env)

# Rendering random rollouts for ten episodes
for _ in range(10):
done = False
obs, info = env.reset()
while not done:
obs, rew, terminated, truncated, info = env.step(env.action_space.sample())
env.render()
print(obs["telemetry"])
done = terminated or truncated
4 changes: 3 additions & 1 deletion gymnasium_wrapper/base_gymnasium_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import itertools
import warnings
from typing import Optional
from typing import Optional, Any

import gymnasium as gym
import numpy as np
Expand Down Expand Up @@ -123,6 +123,7 @@ def __init__(
self.depth = self.game.is_depth_buffer_enabled()
self.labels = self.game.is_labels_buffer_enabled()
self.automap = self.game.is_automap_buffer_enabled()
self.telemetry = True

# parse buttons defined by config file
self.__parse_available_buttons()
Expand Down Expand Up @@ -397,6 +398,7 @@ def __get_observation_space(self):
Returns observation space: Dict with Box entry for each activated buffer:
"screen", "depth", "labels", "automap", "gamevariables"
"""
spaces: dict[str, Any]
spaces = {
"screen": gym.spaces.Box(
0,
Expand Down
110 changes: 110 additions & 0 deletions gymnasium_wrapper/telemetry_obs_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import gymnasium as gym
from gymnasium import ObservationWrapper
from typing import Any
import copy
import numpy as np
import vizdoom as vzd


class TelemetryWrapper(ObservationWrapper):
def __init__(self, env):
self.env = env
# how do we get the underlying game object?
self.game = self.env.unwrapped.game
self.depth = env.unwrapped.depth
self.automap = env.unwrapped.automap
self.channels = env.unwrapped.channels
self.labels = env.unwrapped.labels
self.telemetry = True
self.observation_space = self.__get_observation_space()
super().__init__(env)

def observation(self, observation):
new_observation = copy.deepcopy(observation)
player_telemetry = {
"player_x": self.game.get_game_variable(vzd.GameVariable.POSITION_X),
"player_y": self.game.get_game_variable(vzd.GameVariable.POSITION_Y),
"player_z": self.game.get_game_variable(vzd.GameVariable.POSITION_Z),
"objects_in_scene": [],
"objects_coords": [],
}
state = self.game.get_state()
if state is not None:
print("State is no longer None.")
scene_labels = state.labels
labels_in_scene = []
label_coords_in_scene = []
for label in scene_labels:
labels_in_scene.append(label.object_name)
label_coords_in_scene.append([label.object_position_x,
label.object_position_y,
label.object_position_z])
player_telemetry["objects_in_scene"] = labels_in_scene
player_telemetry["objects_coords"] = label_coords_in_scene
new_observation["telemetry"] = player_telemetry
return new_observation

def __get_observation_space(self):
"""
Returns observation space: Dict with Box entry for each activated buffer:
"screen", "depth", "labels", "automap", "gamevariables", "telemetry"
"""
spaces: dict[str, Any]
spaces = {
"screen": gym.spaces.Box(
0,
255,
(
self.game.get_screen_height(),
self.game.get_screen_width(),
self.channels,
),
dtype=np.uint8,
)
}

if self.depth:
spaces["depth"] = gym.spaces.Box(
0,
255,
(self.game.get_screen_height(), self.game.get_screen_width(), 1),
dtype=np.uint8,
)

if self.labels:
spaces["labels"] = gym.spaces.Box(
0,
255,
(self.game.get_screen_height(), self.game.get_screen_width(), 1),
dtype=np.uint8,
)

if self.automap:
spaces["automap"] = gym.spaces.Box(
0,
255,
(
self.game.get_screen_height(),
self.game.get_screen_width(),
# "automap" buffer uses same number of channels
# as the main screen buffer,
self.channels,
),
dtype=np.uint8,
)

self.num_game_variables = self.game.get_available_game_variables_size()
if self.num_game_variables > 0:
spaces["gamevariables"] = gym.spaces.Box(
np.finfo(np.float32).min,
np.finfo(np.float32).max,
(self.num_game_variables,),
dtype=np.float32,
)
if self.telemetry:
spaces["telemetry"] = gym.spaces.Text(
max_length=10000,
min_length=1
)

return gym.spaces.Dict(spaces)
6 changes: 5 additions & 1 deletion scenarios/health_gathering_supreme.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ render_weapon = false
render_decals = false
render_particles = false
window_visible = true
labels_buffer_enabled = true

# Make episodes finish after 2100 actions (tics)
episode_timeout = 2100
Expand All @@ -31,6 +32,9 @@ available_buttons =
}

# Game variables that will be in the state
available_game_variables = { HEALTH }
available_game_variables =
{
HEALTH
}

mode = PLAYER