Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions concordia/components/game_master/next_acting.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,81 @@ def set_state(self, state: entity_component.ComponentState) -> None:
self._counter = state['counter']


class NextActingAllEntitiesFromSceneSpec(
entity_component.ContextComponent, entity_component.ComponentWithLogging
):
"""A component that makes all scene participants act simultaneously.

Unlike NextActingFromSceneSpec which cycles through participants one at a time,
this component returns all participants to act at once. Designed for use with
simultaneous engines.
"""

def __init__(
self,
memory_component_key: str = (
memory_component.DEFAULT_MEMORY_COMPONENT_KEY
),
scene_tracker_component_key: str = (
scene_tracker_component.DEFAULT_SCENE_TRACKER_COMPONENT_KEY
),
pre_act_label: str = DEFAULT_NEXT_ACTING_PRE_ACT_LABEL,
):
"""Initializes the component.

Args:
scene_tracker_component_key: The name of the scene tracker component.
pre_act_label: Prefix to add to the output of the component when called in
`pre_act`.
"""
super().__init__()
self._memory_component_key = memory_component_key
self._scene_tracker_component_key = scene_tracker_component_key
self._pre_act_label = pre_act_label

def _get_named_component_pre_act_value(self, component_name: str) -> str:
"""Returns the pre-act value of a named component of the parent entity."""
return (
self.get_entity().get_component(
component_name, type_=action_spec_ignored.ActionSpecIgnored
).get_pre_act_value()
)

def _get_current_scene_participants(self) -> Sequence[str]:
scene_tracker = self.get_entity().get_component(
self._scene_tracker_component_key,
type_=scene_tracker_component.SceneTracker,
)
return scene_tracker.get_participants()

def pre_act(
self,
action_spec: entity_lib.ActionSpec,
) -> str:
result = ''
if action_spec.output_type == entity_lib.OutputType.NEXT_ACTING:
scene_participants = self._get_current_scene_participants()
result = ','.join(scene_participants) # All participants at once
return result

def get_currently_active_player(self) -> str | None:
"""Not applicable for this component as all players are always active."""
raise RuntimeError(
'Error in NextActingAllEntitiesFromSceneSpec: '
'get_currently_active_player() is not applicable for this component '
'as all players are always active. You might be using a component '
'that calls this method in a simultaneous environment.'
)

def get_state(self) -> entity_component.ComponentState:
"""Returns the state of the component."""
return {}

def set_state(self, state: entity_component.ComponentState) -> None:
"""Sets the state of the component."""
pass


class NextActionSpec(
entity_component.ContextComponent, entity_component.ComponentWithLogging
):
Expand Down
69 changes: 60 additions & 9 deletions concordia/components/game_master/payoff_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from collections.abc import Callable, Mapping, Sequence
import copy
import re

from concordia.agents import entity_agent
from concordia.components.agent import memory as memory_component
Expand All @@ -33,6 +34,8 @@
CollectiveActionProductionFunction = Callable[[int], float]
PlayersT = Sequence[entity_agent.EntityAgent]

PUTATIVE_EVENT_TAG = event_resolution_component.PUTATIVE_EVENT_TAG


class PayoffMatrix(
entity_component.ContextComponent, entity_component.ComponentWithLogging
Expand All @@ -45,6 +48,7 @@ def __init__(
acting_player_names: Sequence[str],
action_to_scores: Callable[[Mapping[str, str]], Mapping[str, float]],
scores_to_observation: Callable[[Mapping[str, float]], Mapping[str, str]],
acting_order: str = 'sequential',
event_resolution_component_key: str = (
switch_act.DEFAULT_RESOLUTION_COMPONENT_KEY
),
Expand All @@ -70,6 +74,9 @@ def __init__(
a dictionary of scores for each player
scores_to_observation: function that maps a dictionary of scores for each
player to a dictionary of observations for each player.
acting_order: Order in which players act. Options are 'sequential'
(default, actions extracted from EventResolution component) or
'simultaneous' (actions collected from memory).
event_resolution_component_key: The key of the event resolution component.
observation_component_key: The key of the observation component to send
observations to players. If None, no observations will be sent.
Expand All @@ -80,8 +87,15 @@ def __init__(
pre_act_label: Prefix to add to the output of the component when called in
`pre_act`.
verbose: whether to print the full update chain of thought or not

Raises:
ValueError: If acting_order is not 'sequential' or 'simultaneous'.
"""
if acting_order not in ('sequential', 'simultaneous'):
raise ValueError(f'Unsupported acting order: {acting_order}')

self._pre_act_label = pre_act_label
self._acting_order = acting_order

self._model = model
self._observation_component_key = observation_component_key
Expand Down Expand Up @@ -116,6 +130,39 @@ def _get_current_scene_participants(self) -> Sequence[str]:
return scene_tracker_component.get_participants()
return self._acting_player_names

def _extract_actions_from_memory(self) -> None:
"""Extract the last action for each scene participant from memory.

Used in simultaneous mode to collect all players' actions at once.
"""
if not self._memory_component_key:
return

memory = self.get_entity().get_component(
self._memory_component_key,
type_=memory_component.Memory,
)

# Scan for the most recent putative event
suggestions = memory.scan(selector_fn=lambda x: PUTATIVE_EVENT_TAG in x)
if not suggestions:
return

# Extract the action string from the most recent suggestion
putative_action = suggestions[-1][
suggestions[-1].find(PUTATIVE_EVENT_TAG) + len(PUTATIVE_EVENT_TAG) :
]

# Extract all "PlayerName: Action" pairs using regex
# Pattern matches "word: text" up to next word: or end of string
# pattern = r'(\w+):\s+(.+?)(?=\s+\w+:|$)'
all_players = self._get_current_scene_participants()
pattern = rf'({"|".join(map(re.escape, all_players))}):\s*(.*?)\s*(?=(?:{"|".join(map(re.escape, all_players))}):|$)'
found_actions = dict(re.findall(pattern, putative_action))

for player_name in found_actions:
self._partial_joint_action[player_name] = found_actions[player_name]

def _joint_action_is_complete(self, joint_action: Mapping[str, str]) -> bool:
for acting_player_name in self._get_current_scene_participants():
if joint_action[acting_player_name] is None:
Expand All @@ -139,15 +186,19 @@ def post_act(
is_action_complete = False
if self._latest_action_spec_output_type == entity_lib.OutputType.RESOLVE:

event_resolution = self.get_entity().get_component(
self._event_resolution_component_key,
type_=event_resolution_component.EventResolution,
)

player_name = event_resolution.get_active_entity_name()
choice = event_resolution.get_putative_action()
if player_name in self._acting_player_names and choice:
self._partial_joint_action[player_name] = choice
if self._acting_order == 'simultaneous':
# Simultaneous mode: extract all players' actions from memory
self._extract_actions_from_memory()
elif self._acting_order == 'sequential':
# Sequential mode: extract single player action from EventResolution
event_resolution = self.get_entity().get_component(
self._event_resolution_component_key,
type_=event_resolution_component.EventResolution,
)
player_name = event_resolution.get_active_entity_name()
choice = event_resolution.get_putative_action()
if player_name in self._acting_player_names and choice:
self._partial_joint_action[player_name] = choice

# Check if all players have acted so far in the current stage game.
joint_action = self._partial_joint_action.copy()
Expand Down
42 changes: 30 additions & 12 deletions concordia/prefabs/game_master/game_theoretic_and_dramaturgic.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class GameMaster(prefab_lib.Prefab):
'scenes': (),
'action_to_scores': _default_action_to_scores,
'scores_to_observation': _default_scores_to_observation,
'acting_order': 'sequential',
}
)
entities: (
Expand Down Expand Up @@ -211,10 +212,19 @@ def build(
gm_components.next_game_master.DEFAULT_NEXT_GAME_MASTER_COMPONENT_KEY
)

next_actor = gm_components.next_acting.NextActingFromSceneSpec(
memory_component_key=actor_components.memory.DEFAULT_MEMORY_COMPONENT_KEY,
scene_tracker_component_key=scene_tracker_key,
)
acting_order = self.params.get('acting_order', 'sequential')
if acting_order == 'simultaneous':
next_actor = gm_components.next_acting.NextActingAllEntitiesFromSceneSpec(
memory_component_key=actor_components.memory.DEFAULT_MEMORY_COMPONENT_KEY,
scene_tracker_component_key=scene_tracker_key,
)
elif acting_order == 'sequential':
next_actor = gm_components.next_acting.NextActingFromSceneSpec(
memory_component_key=actor_components.memory.DEFAULT_MEMORY_COMPONENT_KEY,
scene_tracker_component_key=scene_tracker_key,
)
else:
raise ValueError(f'Unsupported acting order: {acting_order}')

next_action_spec = gm_components.next_acting.NextActionSpecFromSceneSpec(
memory_component_key=actor_components.memory.DEFAULT_MEMORY_COMPONENT_KEY,
Expand All @@ -227,6 +237,7 @@ def build(
acting_player_names=player_names,
action_to_scores=action_to_scores,
scores_to_observation=scores_to_observation,
acting_order=acting_order,
scene_tracker_component_key=scene_tracker_key,
verbose=True,
)
Expand All @@ -242,11 +253,15 @@ def build(
actor_components.observation.DEFAULT_OBSERVATION_COMPONENT_KEY,
]

event_resolution = gm_components.event_resolution.EventResolution(
model=model,
event_resolution_steps=event_resolution_steps,
components=event_resolution_components,
)
# Only create event resolution for sequential acting
# (simultaneous acting has no single active player to attribute events to)
event_resolution = None
if acting_order == 'sequential':
event_resolution = gm_components.event_resolution.EventResolution(
model=model,
event_resolution_steps=event_resolution_steps,
components=event_resolution_components,
)
scene_tracker = gm_components.scene_tracker.SceneTracker(
model=model,
scenes=scenes,
Expand Down Expand Up @@ -276,11 +291,14 @@ def build(
next_action_spec
),
payoff_matrix_key: payoff_matrix,
gm_components.switch_act.DEFAULT_RESOLUTION_COMPONENT_KEY: (
event_resolution
),
}

# Only add event resolution for sequential acting
if event_resolution is not None:
components_of_game_master[gm_components.switch_act.DEFAULT_RESOLUTION_COMPONENT_KEY] = (
event_resolution
)

component_order = list(components_of_game_master.keys())

act_component = gm_components.switch_act.SwitchAct(
Expand Down
Loading
Loading