Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def pytest_addoption(parser):
# test value sets.
parameter_values = {
("target_id",): {"quick": [0], "full": range(0, 2)},
("my_team_is_yellow",): { # Probably worth running both colours even in quick mode
(
"my_team_is_yellow",
): { # Probably worth running both colours even in quick mode. I agree
"quick": [False, True],
"full": [False, True],
},
Expand Down
1 change: 0 additions & 1 deletion entities/game/ball.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ class Ball:
p: vector.VectorObject3D
v: vector.VectorObject3D
a: vector.VectorObject3D

15 changes: 8 additions & 7 deletions entities/game/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class Game:
ts: float
Expand All @@ -21,21 +22,21 @@ class Game:
field: Field = dataclasses.field(init=False)

def __post_init__(self):
object.__setattr__(self, 'field', Field(self.my_team_is_right))
object.__setattr__(self, "field", Field(self.my_team_is_right))

def is_ball_in_goal(self, right_goal: bool) -> bool:
ball_pos = self.ball
return (
ball_pos.x < -self.field.half_length
ball_pos.p.x < -self.field.half_length
and (
ball_pos.y < self.field.half_goal_width
and ball_pos.y > -self.field.half_goal_width
ball_pos.p.y < self.field.half_goal_width
and ball_pos.p.y > -self.field.half_goal_width
)
and not right_goal
or ball_pos.x > self.field.half_length
or ball_pos.p.x > self.field.half_length
and (
ball_pos.y < self.field.half_goal_width
and ball_pos.y > -self.field.half_goal_width
ball_pos.p.y < self.field.half_goal_width
and ball_pos.p.y > -self.field.half_goal_width
)
and right_goal
)
66 changes: 44 additions & 22 deletions entities/game/past_game.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,45 @@
from collections import deque
from entities.game.game import Game, Robot
from entities.game.ball import Ball
from entities.game.ball import Ball
from vector import VectorObject2D, VectorObject3D
from enum import Enum, auto
from typing import Tuple, Any, Union, Optional, Dict, List
from typing import Tuple, Any, Union, Optional, Dict, List
import numpy as np
from itertools import islice
import logging

logger = logging.getLogger(__name__)


# --- Enums (keep as is) ---
class AttributeType(Enum):
POSITION = auto()
VELOCITY = auto()
# ACCELERATION = auto() # If you decide to store pre-calculated acceleration


class TeamType(Enum):
FRIENDLY = auto()
ENEMY = auto()
NEUTRAL = auto()


class ObjectClass(Enum):
ROBOT = auto()
BALL = auto()


ObjectKey = Tuple[TeamType, ObjectClass, int]


def get_structured_object_key(obj: Any, team: TeamType) -> Optional[ObjectKey]:
if isinstance(obj, Robot) and hasattr(obj, 'id') and isinstance(obj.id, int):
if isinstance(obj, Robot) and hasattr(obj, "id") and isinstance(obj.id, int):
return (team, ObjectClass.ROBOT, obj.id)
elif isinstance(obj, Ball):
elif isinstance(obj, Ball):
return (TeamType.NEUTRAL, ObjectClass.BALL, 0)
logger.warning(f"Could not determine ObjectKey for object of type {type(obj)} with team {team}")
logger.warning(
f"Could not determine ObjectKey for object of type {type(obj)} with team {team}"
)
return None


Expand All @@ -57,12 +63,16 @@ def __init__(self, max_history: int):
ObjectKey, Dict[AttributeType, deque[Tuple[float, np.ndarray]]]
] = {}

def _ensure_attribute_deque_exists(self, object_key: ObjectKey, attribute_type: AttributeType):
def _ensure_attribute_deque_exists(
self, object_key: ObjectKey, attribute_type: AttributeType
):
"""Ensures a deque exists for the given object_key and attribute_type."""
if object_key not in self.historical_data:
self.historical_data[object_key] = {}
if attribute_type not in self.historical_data[object_key]:
self.historical_data[object_key][attribute_type] = deque(maxlen=self.max_history)
self.historical_data[object_key][attribute_type] = deque(
maxlen=self.max_history
)

def _add_attribute_to_history(
self,
Expand All @@ -73,37 +83,47 @@ def _add_attribute_to_history(
):
"""Adds a single attribute value (after converting to NumPy) to the history."""
if value_vector_obj is None:
return # Don't store None values, or decide on a specific handling
return # Don't store None values, or decide on a specific handling

self._ensure_attribute_deque_exists(object_key, attribute_type)
try:
value_np = _vector_to_numpy(value_vector_obj)
self.historical_data[object_key][attribute_type].append((timestamp, value_np))
self.historical_data[object_key][attribute_type].append(
(timestamp, value_np)
)
except TypeError as e:
logger.error(f"Error converting vector for {object_key}, {attribute_type}: {e}")

logger.error(
f"Error converting vector for {object_key}, {attribute_type}: {e}"
)

def _process_entity_for_history(self, entity: Any, entity_key: ObjectKey, timestamp: float):
def _process_entity_for_history(
self, entity: Any, entity_key: ObjectKey, timestamp: float
):
"""Helper to process position and velocity for a given entity."""
if not entity_key:
return

if hasattr(entity, "p"):
self._add_attribute_to_history(entity_key, AttributeType.POSITION, timestamp, entity.p)
self._add_attribute_to_history(
entity_key, AttributeType.POSITION, timestamp, entity.p
)
if hasattr(entity, "v"):
self._add_attribute_to_history(entity_key, AttributeType.VELOCITY, timestamp, entity.v)
self._add_attribute_to_history(
entity_key, AttributeType.VELOCITY, timestamp, entity.v
)
# If you pre-calculate and store acceleration:
# if hasattr(entity, "a"):
# self._add_attribute_to_history(entity_key, AttributeType.ACCELERATION, timestamp, entity.a)


def add_game(self, game: Game):
self.raw_games_history.append(game)
current_ts = game.ts

# Process Ball
if game.ball:
ball_key = get_structured_object_key(game.ball, TeamType.NEUTRAL) # Ball is neutral
ball_key = get_structured_object_key(
game.ball, TeamType.NEUTRAL
) # Ball is neutral
if ball_key:
self._process_entity_for_history(game.ball, ball_key, current_ts)

Expand All @@ -116,14 +136,16 @@ def add_game(self, game: Game):
for robot_instance in robots_dict.values():
robot_key = get_structured_object_key(robot_instance, team_type)
if robot_key:
self._process_entity_for_history(robot_instance, robot_key, current_ts)
self._process_entity_for_history(
robot_instance, robot_key, current_ts
)

def get_historical_attribute_series(
self,
object_key: ObjectKey,
attribute_type: AttributeType,
num_points: int,
) -> Tuple[np.ndarray, np.ndarray]: # Returns (timestamps_np, values_np)
) -> Tuple[np.ndarray, np.ndarray]: # Returns (timestamps_np, values_np)
"""
Retrieves the last num_points of (timestamp, attribute_value_np) for a given object.
Returns data as NumPy arrays (timestamps, values), oldest to newest.
Expand All @@ -138,7 +160,7 @@ def get_historical_attribute_series(
return np.array([], dtype=np.float64), np.array([], dtype=np.float64)

history_deque = object_attributes.get(attribute_type)
if not history_deque: # Handles both key not found or empty deque
if not history_deque: # Handles both key not found or empty deque
# logger.debug(f"No historical {attribute_type.name} for {object_key}")
return np.array([], dtype=np.float64), np.array([], dtype=np.float64)

Expand All @@ -148,7 +170,7 @@ def get_historical_attribute_series(

timestamps_list: List[float] = []
vector_values_list: List[np.ndarray] = []

for ts, vec_np in relevant_data_iter:
timestamps_list.append(ts)
vector_values_list.append(vec_np)
Expand All @@ -160,12 +182,12 @@ def get_historical_attribute_series(
# vector_values_list contains a list of small np.ndarrays.
# np.array() will create a 2D array if all elements of the list are 1D arrays of the same size.
values_np = np.array(vector_values_list)

return timestamps_np, values_np

def n_steps_ago(self, n: int) -> Game:
if not (0 < n <= len(self.raw_games_history)):
raise IndexError(
f"Cannot get game {n} steps ago. History size: {len(self.raw_games_history)}, requested: {n}"
)
return self.raw_games_history[-n]
return self.raw_games_history[-n]
4 changes: 2 additions & 2 deletions entities/game/present_future_game.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from entities.game.past_game import PastGame


class PresentFutureGame():
class PresentFutureGame:
def __init__(self, past: PastGame, current: Game):
self.__past = past
self.current = current
Expand All @@ -11,4 +11,4 @@ def add_game(self, game: Game):
self.__past.add_game(self.current)
self.current = game

# def predict_/
# def predict_/
2 changes: 1 addition & 1 deletion entities/game/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Robot:
id: int
is_friendly: bool
has_ball: bool # Friendly and enemy now have this, friendly is from IR sensor, enemy from position
has_ball: bool # Friendly and enemy now have this, friendly is from IR sensor, enemy from position
p: vector.VectorObject2D
v: vector.VectorObject2D
a: vector.VectorObject2D
Expand Down
14 changes: 8 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from strategy.skills.go_to_ball import GoToBallStrategy
from run import StrategyRunner
from strategy.skills.go_to_ball import GoToBallStrategy
from strategy.skills.solo_attacker import SoloAttackerStrategy
from strategy.skills.solo_defender import SoloDefenderStrategy

if __name__ == "__main__":
runner = StrategyRunner(
strategy=GoToBallStrategy(target_id=0),
strategy=SoloAttackerStrategy(target_id=1),
my_team_is_yellow=True,
my_team_is_right=True,
mode="rsim",
my_team_is_right=False,
mode="grsim",
exp_friendly=3,
exp_enemy=2,
opp_strategy=GoToBallStrategy(target_id=1),
exp_enemy=3,
opp_strategy=SoloDefenderStrategy(target_id=0),
)
test = runner.run()
Loading
Loading