Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions average_steer_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pygbx import Gbx, GbxType
from pygbx.headers import ControlEntry, CGameCtnGhost
from numpy import int32
import math
import sys
import os
from pprint import pprint
Expand All @@ -21,6 +22,33 @@ def event_to_analog_value(event: ControlEntry):
val >>= int32(8)
return -val

def clean_duplicate_steer_events(events: list) -> list:
"""
ESWC events seem a bit janky. Try to get rid of that by removing 'duplicate'
steer events. Take the latest one in every 10 msec block.
"""
ceil10 = lambda x: math.ceil(x / 10) * 10

indices_to_keep = []
candidate_time = None
for i, ev in enumerate(events):
if ev.event_name != 'Steer':
indices_to_keep.append(i)
continue
if candidate_time is not None and ceil10(ev.time) != candidate_time:
indices_to_keep.append(candidate_i)
candidate_i = i
candidate_time = ceil10(events[candidate_i].time)
if candidate_time is not None:
indices_to_keep.append(candidate_i)

events_cleaned = [events[i] for i in sorted(indices_to_keep)]
for i, ev in enumerate(events_cleaned):
if ev.event_name == 'Steer':
events_cleaned[i] = ev.copy()
events_cleaned[i].time = ceil10(ev.time)
return events_cleaned

def partition_steer_events(events: list, sample_period: int):
p = []
current = []
Expand Down Expand Up @@ -56,6 +84,24 @@ def try_parse_old_ghost(g: Gbx):

return None

def try_parse_nations_eswc_ghost(g: Gbx):
ghost = CGameCtnGhost(0)
ghost.login = "" # unavailable in ESWC?

parser = g.find_raw_chunk_id(0x3f00dff)
if parser:
parser.seen_loopback = True
g.read_ghost_events(ghost, parser, 0x3f00dff)
return ghost

parser = g.find_raw_chunk_id(0x3f00dfa)
if parser:
parser.seen_loopback = True
g.read_ghost_events(ghost, parser, 0x3f00dfa)
return ghost

return None

def analyze_replay(path: str):
try:
g = Gbx(path)
Expand All @@ -67,16 +113,18 @@ def analyze_replay(path: str):
if not ghosts:
ghost = try_parse_old_ghost(g)
if not ghost:
print('Error: no ghosts')
return None
ghost = try_parse_nations_eswc_ghost(g)
if not ghost:
print('Error: no ghosts')
return None
ghost.control_entries = clean_duplicate_steer_events(ghost.control_entries)

if not ghost.control_entries:
print('Error: no control entries')
return None
else:
ghost = ghosts[0]

ghost = ghosts[0]
results = {'version': ghost.game_version, 'login': ghost.login, 'max_spikes': 0, 'spikes': 0}

partitions = partition_steer_events(ghost.control_entries, TIME_PERIOD)
Expand Down
52 changes: 52 additions & 0 deletions generate_input_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pygbx import Gbx, GbxType
from pygbx.headers import ControlEntry, CGameCtnGhost
from numpy import int32
import math
import sys
import os

Expand Down Expand Up @@ -54,6 +55,33 @@ def event_to_analog_value(event: ControlEntry):
val >>= int32(8)
return -val

def clean_duplicate_steer_events(events: list) -> list:
"""
ESWC events seem a bit janky. Try to get rid of that by removing 'duplicate'
steer events. Take the latest one in every 10 msec block.
"""
ceil10 = lambda x: math.ceil(x / 10) * 10

indices_to_keep = []
candidate_time = None
for i, ev in enumerate(events):
if ev.event_name != 'Steer':
indices_to_keep.append(i)
continue
if candidate_time is not None and ceil10(ev.time) != candidate_time:
indices_to_keep.append(candidate_i)
candidate_i = i
candidate_time = ceil10(events[candidate_i].time)
if candidate_time is not None:
indices_to_keep.append(candidate_i)

events_cleaned = [events[i] for i in sorted(indices_to_keep)]
for i, ev in enumerate(events_cleaned):
if ev.event_name == 'Steer':
events_cleaned[i] = ev.copy()
events_cleaned[i].time = ceil10(ev.time)
return events_cleaned

def try_parse_old_ghost(g: Gbx):
ghost = CGameCtnGhost(0)

Expand All @@ -69,6 +97,24 @@ def try_parse_old_ghost(g: Gbx):

return None

def try_parse_nations_eswc_ghost(g: Gbx):
ghost = CGameCtnGhost(0)
ghost.login = "" # unavailable in ESWC?

parser = g.find_raw_chunk_id(0x3f00dff)
if parser:
parser.seen_loopback = True
g.read_ghost_events(ghost, parser, 0x3f00dff)
return ghost

parser = g.find_raw_chunk_id(0x3f00dfa)
if parser:
parser.seen_loopback = True
g.read_ghost_events(ghost, parser, 0x3f00dfa)
return ghost

return None

def try_extract_2020(g: Gbx):
cbp = g.find_raw_chunk_id(0x0309201D)
if not cbp:
Expand Down Expand Up @@ -181,6 +227,12 @@ def process_path(path, write_func):
ghosts = g.get_classes_by_ids([GbxType.CTN_GHOST, GbxType.CTN_GHOST_OLD])
if not ghosts:
ghost = try_parse_old_ghost(g)
if not ghost:
ghost = try_parse_nations_eswc_ghost(g)
if not ghost:
print('Error: no ghosts')
return None
ghost.control_entries = clean_duplicate_steer_events(ghost.control_entries)
else:
ghost = ghosts[0]

Expand Down