Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added Media/wheel.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
253 changes: 83 additions & 170 deletions Models/data_utils/load_data_auto_steer.py
Original file line number Diff line number Diff line change
@@ -1,175 +1,88 @@
#!/usr/bin/env python3

import os
import json
import numpy as np
import random
from torch.utils.data import Dataset
from PIL import Image
from typing import List

class LoadDataAutoSteer():
def __init__(
self,
dataset_root: str,
temporal_length: int = 3,
):
"""
Args:
dataset_root: Root directory containing sub-datasets (60/, 70/, 80/, 100/)
temporal_length: Number of consecutive frames (default: 3 for t-2, t-1, t)
"""
# Define sub-datasets
sub_dirs = ['60', '70', '80', '100']
self.dataset_roots = [os.path.join(dataset_root, sub) for sub in sub_dirs]
self.temporal_length = temporal_length

# Load annotations from all datasets
self._load_annotations()

# Split into train/val
self._split_data()

print(f"Dataset loaded with {self.N_trains} trains and {self.N_vals} vals.")

def _load_annotations(self):
"""Load steering angle annotations from all dataset directories."""
self.annotations = []

for dataset_root in self.dataset_roots:
json_path = os.path.join(dataset_root, 'steering_angle_image_timestamp_aligned.json')
image_dir = os.path.join(dataset_root, 'images')

with open(json_path, 'r') as f:
dataset_annotations = json.load(f)

# Add dataset info to each annotation
for ann in dataset_annotations:
ann['image_dir'] = image_dir

self.annotations.extend(dataset_annotations)

# Sort by timestamp
self.annotations = sorted(self.annotations, key=lambda x: x['timestamp'])

def _split_data(self):
"""Split data into train/val following AutoSteer pattern."""
self.train_indices = []
self.val_indices = []
self.N_trains = 0
self.N_vals = 0

# Start from temporal_length-1 to have enough history
for set_idx in range(self.temporal_length - 1, len(self.annotations)):
if (set_idx % 10 == 0):
# Slap it to Val
self.val_indices.append(set_idx)
self.N_vals += 1
else:
# Slap it to Train
self.train_indices.append(set_idx)
self.N_trains += 1

def getItemCount(self):
"""Get sizes of Train/Val sets."""
return self.N_trains, self.N_vals

def getItem(self, index: int, is_train: bool):
"""
Get item at index, returning temporal image sequence and steering angle.

Args:
index: Index in train or val set
is_train: True for training set, False for validation set

Returns:
List containing:
- frame_id: Current frame timestamp
- images: List of PIL images [t-2, t-1, t]
- steering_angle: Calibrated steering angle (float)
"""
if is_train:
ann_idx = self.train_indices[index]
else:
ann_idx = self.val_indices[index]

# Load temporal sequence [t-2, t-1, t]
images = []
for offset in range(self.temporal_length):
frame_idx = ann_idx - (self.temporal_length - 1 - offset)
timestamp = self.annotations[frame_idx]['timestamp']
image_dir = self.annotations[frame_idx]['image_dir']

img_path = os.path.join(image_dir, f"{timestamp}.jpg")
img = Image.open(img_path).convert('RGB')
images.append(img)

# Get steering angle
current_annotation = self.annotations[ann_idx]
frame_id = current_annotation['timestamp']
steering_angle = current_annotation['steering_angle']
zero_point = current_annotation['steering_zero_point']
steering_angle = steering_angle - zero_point

return [
frame_id,
images,
steering_angle,
import json
import torchvision.transforms as T


class DDataset(Dataset):
def __init__(self, root_dir, transform=None, routes=None):
self.root_dir = root_dir
self.transform = transform
self.pairs = []

if routes is None:
routes = sorted([s for s in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, s))])

for route in routes:
route_path = os.path.join(root_dir, route)
sequences = sorted([r for r in os.listdir(route_path) if os.path.isdir(os.path.join(route_path, r))])

for sequence in sequences:
route_path = os.path.join(route_path, sequence)
metadata_file = os.path.join(route_path, "metadata.json")
if not os.path.exists(metadata_file):
continue

with open(metadata_file, "r") as f:
metadata = json.load(f)

frames = sorted(metadata["frames"], key=lambda x: x["timestamp"])

for i in range(1, len(frames)):
img_T_minus_1_path = os.path.join(route_path, f"{frames[i - 1]['timestamp']}.jpg")
steering_angle_T_minus_1 = frames[i - 1]["steering_angle_corrected"]
img_T_path = os.path.join(route_path, f"{frames[i]['timestamp']}.jpg")
steering_angle_T = frames[i]["steering_angle_corrected"]

if os.path.exists(img_T_minus_1_path) and os.path.exists(img_T_path):
self.pairs.append((img_T_minus_1_path, steering_angle_T_minus_1, img_T_path, steering_angle_T))

self.pairs = self._filter(self.pairs)

print(f"Dataset created: {len(self.pairs)} image pairs from {len(routes)} sequences.")

def _filter(self, pairs):
intervals = [
(1761806100565550080, 1761806286756780032),
(1761806474351460096, 1761806628653689856),
(1761806853293499904, 1761806988906460160),
(1761807511835620096, 1761807761244930048)
]

def in_any_interval(t):
return any(start <= t <= end for start, end in intervals)

pairs = [pair for pair in pairs if in_any_interval(int(os.path.splitext(os.path.basename(pair[2]))[0]))]
pairs = [pair for pair in pairs if abs(pair[3]) <= 30]

return pairs

def __len__(self):
return len(self.pairs)

def __getitem__(self, idx):
img_T_minus_1_path, steering_angle_T_minus_1, img_T_path, steering_angle_T = self.pairs[idx]
img_T_minus_1 = Image.open(img_T_minus_1_path).convert("RGB")
img_T = Image.open(img_T_path).convert("RGB")

if self.transform:
img_T_minus_1 = self.transform(img_T_minus_1)
img_T = self.transform(img_T)

return img_T_minus_1, steering_angle_T_minus_1, img_T, steering_angle_T


class LoadDataAutoSteer:
def __init__(self, root_dir, transform=None, train_split=0.9, seed=42):
self.root_dir = root_dir
self.transform = transform

# --- Split sequences ---
train_routes = sorted(["train"])
val_routes = sorted(["val"])

if __name__ == '__main__':
import sys
from augmentations import Augmentations

if len(sys.argv) < 2:
print("Usage: python load_data_auto_steer.py <dataset_root>")
print("Example: python load_data_auto_steer.py /path/to/dataset")
sys.exit(1)

dataset_root = sys.argv[1]
print(f"Loading dataset root: {dataset_root}")

# Create data loader
data_loader = LoadDataAutoSteer(
dataset_root=dataset_root,
temporal_length=3
)

# Get counts
n_train, n_val = data_loader.getItemCount()
print(f"\nTrain samples: {n_train}")
print(f"Val samples: {n_val}")

# Test train sample with augmentations
if n_train > 0:
frame_id, images, steering_angle = data_loader.getItem(0, is_train=True)
print(f"\nTrain sample:")
print(f" Frame ID: {frame_id}")
print(f" Images: {len(images)} frames")
print(f" Image size: {images[0].size}")
print(f" Steering angle: {steering_angle:.4f}")

# Apply augmentations to all 3 images
augmentor = Augmentations(is_train=True, data_type="KEYPOINTS")
augmented_images = []

for i, img in enumerate(images):
# Convert PIL to numpy
img_np = np.array(img)

# Apply AutoSteer transform (resize + noise)
augmented_img = augmentor.applyTransformAutoSteer(img_np)

augmented_images.append(augmented_img)

# Show augmented image
print(f"\nShowing augmented image {i} (t-{2-i})...")
Image.fromarray(augmented_img).show()

print(f"\nAugmented {len(augmented_images)} images")

# Test val sample
if n_val > 0:
frame_id, images, steering_angle = data_loader.getItem(0, is_train=False)
print(f"\nVal sample:")
print(f" Frame ID: {frame_id}")
print(f" Steering angle: {steering_angle:.4f}")
# --- Create dataset instances ---
self.train = DDataset(root_dir, transform=transform, routes=train_routes)
self.val = DDataset(root_dir, transform=transform, routes=val_routes)
91 changes: 91 additions & 0 deletions Models/inference/auto_steer_infer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# %%
# Comment above is for Jupyter execution in VSCode

Check warning on line 2 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Forbidden word (VSCode)
# ! /usr/bin/env python3
import sys
import math
import torch
from PIL import Image
from torchvision import transforms

import numpy as np

sys.path.append('..')
from Models.model_components.ego_lanes_network import EgoLanesNetwork
from Models.model_components.auto_steer_network import AutoSteerNetwork


class AutoSpeedNetworkInfer():
def __init__(self, egolanes_checkpoint_path='', autosteer_checkpoint_path=''):

Check warning on line 18 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (autosteer)

Check warning on line 18 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (egolanes)

# Image loader
self.image_loader = transforms.Compose(
[
# transforms.CenterCrop((1440, 2880)), # e.g. (224, 224),
# transforms.Resize((320, 640)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
]
)

# Checking devices (GPU vs CPU)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using {self.device} for inference')

# Instantiate model, load to device and set to evaluation mode
if (len(egolanes_checkpoint_path) > 0 and len(autosteer_checkpoint_path) > 0):

Check warning on line 35 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (autosteer)

Check warning on line 35 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (egolanes)
# Loading model with full pre-trained weights
self.egoLanesNetwork = EgoLanesNetwork()
self.egoLanesNetwork.load_state_dict(torch.load \
(egolanes_checkpoint_path, weights_only=True,

Check warning on line 39 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (egolanes)
map_location=self.device))

self.model = AutoSteerNetwork()

# If the model is also pre-trained then load the pre-trained downstream weights
self.model.load_state_dict(torch.load \
(autosteer_checkpoint_path, weights_only=True, map_location=self.device))

Check warning on line 46 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (autosteer)
else:
raise ValueError('No path to checkpiont file provided in class initialization')

Check warning on line 48 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (checkpiont)

self.egoLanesNetwork = self.egoLanesNetwork.to(self.device)
self.egoLanesNetwork = self.egoLanesNetwork.eval()
self.model = self.model.to(self.device)
self.model = self.model.eval()

# self.feature = torch.zeros_like(torch.randn(1, 64, 10, 20)).to(self.device)

Check warning on line 55 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (randn)
self.feature = torch.zeros_like(torch.randn(1, 64, 10, 20)).to(self.device)

Check warning on line 56 in Models/inference/auto_steer_infer.py

View workflow job for this annotation

GitHub Actions / spell-check-differential

Unknown word (randn)

self.image_T_minus_1 = Image.new("RGB", (640, 320), color=(0, 0, 0))

def inference(self, image):

width, height = image.size
if (width != 640 or height != 320):
raise ValueError('Incorrect input size - input image must have height of 320px and width of 640px')

# self.image_T_mius_1.show()
image_tensor_T_minus_1 = self.image_loader(self.image_T_minus_1)
image_tensor_T_minus_1 = image_tensor_T_minus_1.unsqueeze(0)
image_tensor_T_minus_1 = image_tensor_T_minus_1.to(self.device)

image_tensor_T = self.image_loader(image)
image_tensor_T = image_tensor_T.unsqueeze(0)
image_tensor_T = image_tensor_T.to(self.device)

# Run model
with torch.no_grad():
l1 = self.egoLanesNetwork(image_tensor_T_minus_1)
l2 = self.egoLanesNetwork(image_tensor_T)
lane_features_concat = torch.cat((l1, l2), dim=1)
_, prediction = self.model(lane_features_concat)
prediction = prediction.squeeze(0).cpu().detach()

# prediction = self.model(image_tensor_T)

# Get output, find max class probability and convert to steering angle
# probs = torch.nn.functional.softmax(prediction, dim=0)
output = torch.argmax(prediction).item() - 30

self.image_T_minus_1 = image.copy()

return output
14 changes: 12 additions & 2 deletions Models/model_components/auto_steer_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ def __init__(self):
self.decode_layer_2 = nn.Conv2d(32, 32, 3, 1, 1)
self.decode_layer_3 = nn.Conv2d(32, 32, 3, 1, 1)

self.dropout_aggressize = nn.Dropout(p=0.4)

# Steering Angle - Prediction Layers
self.steering_pred_layer_prev_0 = nn.Linear(1600, 1600)
self.steering_pred_layer_prev_1 = nn.Linear(1600, 61)

# Steering Angle - Prediction Layers
self.steering_pred_layer_0 = nn.Linear(1600, 1600)
self.dropout_aggressize = nn.Dropout(p=0.4)
self.steering_pred_layer_1 = nn.Linear(1600, 61)

def forward(self, lane_features_concat):
Expand Down Expand Up @@ -62,6 +67,11 @@ def forward(self, lane_features_concat):
# Create feature vector - 1600
feature_vector = torch.flatten(steering_angle_features)

steering_angle_prev = self.steering_pred_layer_prev_0(feature_vector)
steering_angle_prev = self.GeLU(steering_angle_prev)
steering_angle_prev = self.dropout_aggressize(steering_angle_prev)
steering_angle_prediction_prev = self.steering_pred_layer_prev_1(steering_angle_prev)

steering_angle = self.steering_pred_layer_0(feature_vector)
steering_angle = self.GeLU(steering_angle)
steering_angle = self.dropout_aggressize(steering_angle)
Expand All @@ -72,4 +82,4 @@ def forward(self, lane_features_concat):
# Trained as a classificaiton problem, where the argmax indicates the steering angle
# Cross Entropy Loss

return steering_angle_prediction
return steering_angle_prediction_prev, steering_angle_prediction
Loading
Loading