diff --git a/Media/wheel.png b/Media/wheel.png new file mode 100644 index 000000000..439d40d96 Binary files /dev/null and b/Media/wheel.png differ diff --git a/Models/data_utils/load_data_auto_steer.py b/Models/data_utils/load_data_auto_steer.py index 2dc1a7182..fbdbc99d7 100644 --- a/Models/data_utils/load_data_auto_steer.py +++ b/Models/data_utils/load_data_auto_steer.py @@ -1,175 +1,88 @@ -#!/usr/bin/env python3 - import os -import json -import numpy as np +import random +from torch.utils.data import Dataset from PIL import Image -from typing import List - -class LoadDataAutoSteer(): - def __init__( - self, - dataset_root: str, - temporal_length: int = 3, - ): - """ - Args: - dataset_root: Root directory containing sub-datasets (60/, 70/, 80/, 100/) - temporal_length: Number of consecutive frames (default: 3 for t-2, t-1, t) - """ - # Define sub-datasets - sub_dirs = ['60', '70', '80', '100'] - self.dataset_roots = [os.path.join(dataset_root, sub) for sub in sub_dirs] - self.temporal_length = temporal_length - - # Load annotations from all datasets - self._load_annotations() - - # Split into train/val - self._split_data() - - print(f"Dataset loaded with {self.N_trains} trains and {self.N_vals} vals.") - - def _load_annotations(self): - """Load steering angle annotations from all dataset directories.""" - self.annotations = [] - - for dataset_root in self.dataset_roots: - json_path = os.path.join(dataset_root, 'steering_angle_image_timestamp_aligned.json') - image_dir = os.path.join(dataset_root, 'images') - - with open(json_path, 'r') as f: - dataset_annotations = json.load(f) - - # Add dataset info to each annotation - for ann in dataset_annotations: - ann['image_dir'] = image_dir - - self.annotations.extend(dataset_annotations) - - # Sort by timestamp - self.annotations = sorted(self.annotations, key=lambda x: x['timestamp']) - - def _split_data(self): - """Split data into train/val following AutoSteer pattern.""" - self.train_indices = [] - self.val_indices = [] - self.N_trains = 0 - self.N_vals = 0 - - # Start from temporal_length-1 to have enough history - for set_idx in range(self.temporal_length - 1, len(self.annotations)): - if (set_idx % 10 == 0): - # Slap it to Val - self.val_indices.append(set_idx) - self.N_vals += 1 - else: - # Slap it to Train - self.train_indices.append(set_idx) - self.N_trains += 1 - - def getItemCount(self): - """Get sizes of Train/Val sets.""" - return self.N_trains, self.N_vals - - def getItem(self, index: int, is_train: bool): - """ - Get item at index, returning temporal image sequence and steering angle. - - Args: - index: Index in train or val set - is_train: True for training set, False for validation set - - Returns: - List containing: - - frame_id: Current frame timestamp - - images: List of PIL images [t-2, t-1, t] - - steering_angle: Calibrated steering angle (float) - """ - if is_train: - ann_idx = self.train_indices[index] - else: - ann_idx = self.val_indices[index] - - # Load temporal sequence [t-2, t-1, t] - images = [] - for offset in range(self.temporal_length): - frame_idx = ann_idx - (self.temporal_length - 1 - offset) - timestamp = self.annotations[frame_idx]['timestamp'] - image_dir = self.annotations[frame_idx]['image_dir'] - - img_path = os.path.join(image_dir, f"{timestamp}.jpg") - img = Image.open(img_path).convert('RGB') - images.append(img) - - # Get steering angle - current_annotation = self.annotations[ann_idx] - frame_id = current_annotation['timestamp'] - steering_angle = current_annotation['steering_angle'] - zero_point = current_annotation['steering_zero_point'] - steering_angle = steering_angle - zero_point - - return [ - frame_id, - images, - steering_angle, +import json +import torchvision.transforms as T + + +class DDataset(Dataset): + def __init__(self, root_dir, transform=None, routes=None): + self.root_dir = root_dir + self.transform = transform + self.pairs = [] + + if routes is None: + routes = sorted([s for s in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, s))]) + + for route in routes: + route_path = os.path.join(root_dir, route) + sequences = sorted([r for r in os.listdir(route_path) if os.path.isdir(os.path.join(route_path, r))]) + + for sequence in sequences: + route_path = os.path.join(route_path, sequence) + metadata_file = os.path.join(route_path, "metadata.json") + if not os.path.exists(metadata_file): + continue + + with open(metadata_file, "r") as f: + metadata = json.load(f) + + frames = sorted(metadata["frames"], key=lambda x: x["timestamp"]) + + for i in range(1, len(frames)): + img_T_minus_1_path = os.path.join(route_path, f"{frames[i - 1]['timestamp']}.jpg") + steering_angle_T_minus_1 = frames[i - 1]["steering_angle_corrected"] + img_T_path = os.path.join(route_path, f"{frames[i]['timestamp']}.jpg") + steering_angle_T = frames[i]["steering_angle_corrected"] + + if os.path.exists(img_T_minus_1_path) and os.path.exists(img_T_path): + self.pairs.append((img_T_minus_1_path, steering_angle_T_minus_1, img_T_path, steering_angle_T)) + + self.pairs = self._filter(self.pairs) + + print(f"Dataset created: {len(self.pairs)} image pairs from {len(routes)} sequences.") + + def _filter(self, pairs): + intervals = [ + (1761806100565550080, 1761806286756780032), + (1761806474351460096, 1761806628653689856), + (1761806853293499904, 1761806988906460160), + (1761807511835620096, 1761807761244930048) ] + def in_any_interval(t): + return any(start <= t <= end for start, end in intervals) + + pairs = [pair for pair in pairs if in_any_interval(int(os.path.splitext(os.path.basename(pair[2]))[0]))] + pairs = [pair for pair in pairs if abs(pair[3]) <= 30] + + return pairs + + def __len__(self): + return len(self.pairs) + + def __getitem__(self, idx): + img_T_minus_1_path, steering_angle_T_minus_1, img_T_path, steering_angle_T = self.pairs[idx] + img_T_minus_1 = Image.open(img_T_minus_1_path).convert("RGB") + img_T = Image.open(img_T_path).convert("RGB") + + if self.transform: + img_T_minus_1 = self.transform(img_T_minus_1) + img_T = self.transform(img_T) + + return img_T_minus_1, steering_angle_T_minus_1, img_T, steering_angle_T + + +class LoadDataAutoSteer: + def __init__(self, root_dir, transform=None, train_split=0.9, seed=42): + self.root_dir = root_dir + self.transform = transform + + # --- Split sequences --- + train_routes = sorted(["train"]) + val_routes = sorted(["val"]) -if __name__ == '__main__': - import sys - from augmentations import Augmentations - - if len(sys.argv) < 2: - print("Usage: python load_data_auto_steer.py ") - print("Example: python load_data_auto_steer.py /path/to/dataset") - sys.exit(1) - - dataset_root = sys.argv[1] - print(f"Loading dataset root: {dataset_root}") - - # Create data loader - data_loader = LoadDataAutoSteer( - dataset_root=dataset_root, - temporal_length=3 - ) - - # Get counts - n_train, n_val = data_loader.getItemCount() - print(f"\nTrain samples: {n_train}") - print(f"Val samples: {n_val}") - - # Test train sample with augmentations - if n_train > 0: - frame_id, images, steering_angle = data_loader.getItem(0, is_train=True) - print(f"\nTrain sample:") - print(f" Frame ID: {frame_id}") - print(f" Images: {len(images)} frames") - print(f" Image size: {images[0].size}") - print(f" Steering angle: {steering_angle:.4f}") - - # Apply augmentations to all 3 images - augmentor = Augmentations(is_train=True, data_type="KEYPOINTS") - augmented_images = [] - - for i, img in enumerate(images): - # Convert PIL to numpy - img_np = np.array(img) - - # Apply AutoSteer transform (resize + noise) - augmented_img = augmentor.applyTransformAutoSteer(img_np) - - augmented_images.append(augmented_img) - - # Show augmented image - print(f"\nShowing augmented image {i} (t-{2-i})...") - Image.fromarray(augmented_img).show() - - print(f"\nAugmented {len(augmented_images)} images") - - # Test val sample - if n_val > 0: - frame_id, images, steering_angle = data_loader.getItem(0, is_train=False) - print(f"\nVal sample:") - print(f" Frame ID: {frame_id}") - print(f" Steering angle: {steering_angle:.4f}") + # --- Create dataset instances --- + self.train = DDataset(root_dir, transform=transform, routes=train_routes) + self.val = DDataset(root_dir, transform=transform, routes=val_routes) diff --git a/Models/inference/auto_steer_infer.py b/Models/inference/auto_steer_infer.py new file mode 100644 index 000000000..994460391 --- /dev/null +++ b/Models/inference/auto_steer_infer.py @@ -0,0 +1,91 @@ +# %% +# Comment above is for Jupyter execution in VSCode +# ! /usr/bin/env python3 +import sys +import math +import torch +from PIL import Image +from torchvision import transforms + +import numpy as np + +sys.path.append('..') +from Models.model_components.ego_lanes_network import EgoLanesNetwork +from Models.model_components.auto_steer_network import AutoSteerNetwork + + +class AutoSpeedNetworkInfer(): + def __init__(self, egolanes_checkpoint_path='', autosteer_checkpoint_path=''): + + # Image loader + self.image_loader = transforms.Compose( + [ + # transforms.CenterCrop((1440, 2880)), # e.g. (224, 224), + # transforms.Resize((320, 640)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + ] + ) + + # Checking devices (GPU vs CPU) + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + print(f'Using {self.device} for inference') + + # Instantiate model, load to device and set to evaluation mode + if (len(egolanes_checkpoint_path) > 0 and len(autosteer_checkpoint_path) > 0): + # Loading model with full pre-trained weights + self.egoLanesNetwork = EgoLanesNetwork() + self.egoLanesNetwork.load_state_dict(torch.load \ + (egolanes_checkpoint_path, weights_only=True, + map_location=self.device)) + + self.model = AutoSteerNetwork() + + # If the model is also pre-trained then load the pre-trained downstream weights + self.model.load_state_dict(torch.load \ + (autosteer_checkpoint_path, weights_only=True, map_location=self.device)) + else: + raise ValueError('No path to checkpiont file provided in class initialization') + + self.egoLanesNetwork = self.egoLanesNetwork.to(self.device) + self.egoLanesNetwork = self.egoLanesNetwork.eval() + self.model = self.model.to(self.device) + self.model = self.model.eval() + + # self.feature = torch.zeros_like(torch.randn(1, 64, 10, 20)).to(self.device) + self.feature = torch.zeros_like(torch.randn(1, 64, 10, 20)).to(self.device) + + self.image_T_minus_1 = Image.new("RGB", (640, 320), color=(0, 0, 0)) + + def inference(self, image): + + width, height = image.size + if (width != 640 or height != 320): + raise ValueError('Incorrect input size - input image must have height of 320px and width of 640px') + + # self.image_T_mius_1.show() + image_tensor_T_minus_1 = self.image_loader(self.image_T_minus_1) + image_tensor_T_minus_1 = image_tensor_T_minus_1.unsqueeze(0) + image_tensor_T_minus_1 = image_tensor_T_minus_1.to(self.device) + + image_tensor_T = self.image_loader(image) + image_tensor_T = image_tensor_T.unsqueeze(0) + image_tensor_T = image_tensor_T.to(self.device) + + # Run model + with torch.no_grad(): + l1 = self.egoLanesNetwork(image_tensor_T_minus_1) + l2 = self.egoLanesNetwork(image_tensor_T) + lane_features_concat = torch.cat((l1, l2), dim=1) + _, prediction = self.model(lane_features_concat) + prediction = prediction.squeeze(0).cpu().detach() + + # prediction = self.model(image_tensor_T) + + # Get output, find max class probability and convert to steering angle + # probs = torch.nn.functional.softmax(prediction, dim=0) + output = torch.argmax(prediction).item() - 30 + + self.image_T_minus_1 = image.copy() + + return output diff --git a/Models/model_components/auto_steer_network.py b/Models/model_components/auto_steer_network.py index b7a0eeba2..f96f7f829 100644 --- a/Models/model_components/auto_steer_network.py +++ b/Models/model_components/auto_steer_network.py @@ -16,9 +16,14 @@ def __init__(self): self.decode_layer_2 = nn.Conv2d(32, 32, 3, 1, 1) self.decode_layer_3 = nn.Conv2d(32, 32, 3, 1, 1) + self.dropout_aggressize = nn.Dropout(p=0.4) + + # Steering Angle - Prediction Layers + self.steering_pred_layer_prev_0 = nn.Linear(1600, 1600) + self.steering_pred_layer_prev_1 = nn.Linear(1600, 61) + # Steering Angle - Prediction Layers self.steering_pred_layer_0 = nn.Linear(1600, 1600) - self.dropout_aggressize = nn.Dropout(p=0.4) self.steering_pred_layer_1 = nn.Linear(1600, 61) def forward(self, lane_features_concat): @@ -62,6 +67,11 @@ def forward(self, lane_features_concat): # Create feature vector - 1600 feature_vector = torch.flatten(steering_angle_features) + steering_angle_prev = self.steering_pred_layer_prev_0(feature_vector) + steering_angle_prev = self.GeLU(steering_angle_prev) + steering_angle_prev = self.dropout_aggressize(steering_angle_prev) + steering_angle_prediction_prev = self.steering_pred_layer_prev_1(steering_angle_prev) + steering_angle = self.steering_pred_layer_0(feature_vector) steering_angle = self.GeLU(steering_angle) steering_angle = self.dropout_aggressize(steering_angle) @@ -72,4 +82,4 @@ def forward(self, lane_features_concat): # Trained as a classificaiton problem, where the argmax indicates the steering angle # Cross Entropy Loss - return steering_angle_prediction \ No newline at end of file + return steering_angle_prediction_prev, steering_angle_prediction \ No newline at end of file diff --git a/Models/training/auto_steer_trainer.py b/Models/training/auto_steer_trainer.py index 17fe1d862..3d13172e9 100644 --- a/Models/training/auto_steer_trainer.py +++ b/Models/training/auto_steer_trainer.py @@ -5,3 +5,391 @@ # Runs the network # Calculates the loss # Creates visualizations in tensorboard + +import torch +from torchvision import transforms +from torch import nn, optim +from torch.utils.tensorboard import SummaryWriter +import matplotlib.pyplot as plt +import numpy as np +import pathlib +from PIL import Image +import cv2 +import sys +import random + +sys.path.append('..') +from Models.model_components.ego_lanes_network import EgoLanesNetwork +from Models.model_components.auto_steer_network import AutoSteerNetwork +from Models.data_utils.augmentations import Augmentations + +np.Inf = np.inf + + +class AutoSteerTrainer(): + def __init__(self, checkpoint_path='', pretrained_checkpoint_path='', is_pretrained=False): + + # Image and ground truth as Numpy arrays and Pytorch tensors + self.img_T_minus_1 = 0 + self.gt_T_minus_1 = 0 + self.gt_tensor_minus_1 = 0 + self.img_T = 0 + self.gt_T = 0 + self.gt_tensor = 0 + + # Loss and prediction + self.loss = 0 + self.prediction_prev = 0 + self.prediction = 0 + self.prediction_val = 0 + + # Checking devices (GPU vs CPU) + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + print(f'Using {self.device} for inference') + + if (is_pretrained): + + # Instantiate Model for validation or inference - load both pre-traiend EgoLanes and SuperDepth weights + if (len(checkpoint_path) > 0): + + # Loading model with full pre-trained weights + self.egoLanesNetwork = EgoLanesNetwork() + self.egoLanesNetwork.load_state_dict(torch.load \ + (pretrained_checkpoint_path, weights_only=True, + map_location=self.device)) + self.model = AutoSteerNetwork() + + # If the model is also pre-trained then load the pre-trained downstream weights + self.model.load_state_dict(torch.load \ + (checkpoint_path, weights_only=True, map_location=self.device)) + print('Loading pre-trained model weights of AutoSteer and upstream EgoLanes weights as well') + else: + raise ValueError('Please ensure AutoSteer network weights are provided for downstream elements') + + else: + + # Instantiate Model for training - load pre-traiend EgoLanes weights only + if (len(pretrained_checkpoint_path) > 0): + + # Loading EgoLanes pre-trained for upstream weights + self.egoLanesNetwork = EgoLanesNetwork() + self.egoLanesNetwork.load_state_dict(torch.load \ + (pretrained_checkpoint_path, weights_only=True, + map_location=self.device)) + + # Loading model with pre-trained upstream weights + self.model = AutoSteerNetwork() + print( + 'Loading pre-trained model weights of upstream EgoLanes only, AutoSteer initialised with random weights') + else: + raise ValueError('Please ensure EgoLanes network weights are provided for upstream elements') + + # Model to device + self.egoLanesNetwork = self.egoLanesNetwork.to(self.device) + self.model = self.model.to(self.device) + + # TensorBoard + self.writer = SummaryWriter() + + # Learning rate and optimizer + self.learning_rate = 0.0005 + self.optimizer = optim.AdamW(self.model.parameters(), self.learning_rate) + + # Loaders + self.image_loader = transforms.Compose( + [ + # transforms.CenterCrop((1440, 2880)), # e.g. (224, 224), + # transforms.Resize((320, 640)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + ] + ) + + self.gt_loader = transforms.Compose( + [ + transforms.Lambda(lambda x: torch.tensor(x, dtype=torch.float32)) + ] + ) + + # Assign input variables + def set_data(self, img_T_minus_1, gt_T_minus_1, img_T, gt_T): + self.img_T_minus_1 = img_T_minus_1 + self.gt_T_minus_1 = gt_T_minus_1 + self.img_T = img_T + self.gt_T = gt_T + + # Set learning rate + def set_learning_rate(self, learning_rate): + self.learning_rate = learning_rate + + # Image agumentations flip images horizontally + def apply_augmentations(self): + self.img_T_minus_1 = self.img_T_minus_1[:, ::-1, :].copy() + self.gt_T_minus_1 = -self.gt_T_minus_1 + + self.img_T = self.img_T[:, ::-1, :].copy() + self.gt_T = -self.gt_T + + # Load Data + def load_data(self): + self.load_image_tensor() + self.load_gt_tensor() + + # Load Image as Tensor + def load_image_tensor(self): + # T-1 + image_tensor_T_minus_1 = self.image_loader(self.img_T_minus_1) + image_tensor_T_minus_1 = image_tensor_T_minus_1.unsqueeze(0) + self.image_tensor_T_minus_1 = image_tensor_T_minus_1.to(self.device) + + # T + image_tensor_T = self.image_loader(self.img_T) + image_tensor_T = image_tensor_T.unsqueeze(0) + self.image_tensor_T = image_tensor_T.to(self.device) + + # Load Ground Truth as Tensor + def load_gt_tensor(self): + gt_tensor = self.gt_loader(self.gt_T) + gt_tensor = gt_tensor.unsqueeze(0) + self.gt_tensor = gt_tensor.to(self.device) + + gt_tensor_minus_1 = self.gt_loader(self.gt_T_minus_1) + gt_tensor_minus_1 = gt_tensor_minus_1.unsqueeze(0) + self.gt_tensor_minus_1 = gt_tensor_minus_1.to(self.device) + + def angle_to_tensor(self, angle, num_classes=61, angle_min=-30, angle_max=30): + angle = max(angle_min, min(angle_max, angle)) + class_idx = int(round(angle - angle_min)) # 1 deg resolution + gt_tensor = torch.tensor([class_idx], dtype=torch.long) # shape [1] + gt_tensor = gt_tensor.to(self.device) + return gt_tensor + + def angle_to_class(self, angle, angle_min=-30, angle_max=30): + angle = max(angle_min, min(angle_max, angle)) + class_idx = int(round(angle - angle_min)) # maps -30→0, +30→60 + return class_idx + + # Run Model + def run_model(self): + sigma = 1.0 + num_classes = 61 + + with torch.no_grad(): + l1 = self.egoLanesNetwork(self.image_tensor_T_minus_1) + l2 = self.egoLanesNetwork(self.image_tensor_T) + lane_features_concat = torch.cat((l1, l2), dim=1) + self.prediction_prev, self.prediction = self.model(lane_features_concat) + # self.prediction = torch.argmax(self.prediction).item() - 30 + + # # Cross Entropy Loss + # CEL = nn.CrossEntropyLoss() + # loss_prev = CEL(self.prediction_prev.unsqueeze(0), self.angle_to_tensor(self.gt_T_minus_1)) + # loss = CEL(self.prediction.unsqueeze(0), self.angle_to_tensor(self.gt_T)) + # + # self.loss = loss_prev + loss + + # # L1 loss + # prediction_class = torch.argmax(self.prediction, dim=1).item() + # pred_tensor = torch.tensor([prediction_class - 30], dtype=torch.float32) + # gt_tensor = torch.tensor([self.gt_T], dtype=torch.float32) + # + # l1 = torch.nn.L1Loss() + # self.l1_loss = l1(pred_tensor, gt_tensor) + + # KL divergence + # Create Gaussian target distribution + log_softmax = nn.LogSoftmax(dim=1) + classes = torch.arange(num_classes, dtype=torch.float32, device=self.device) + + # # T - 1 + prediction_prev = log_softmax(self.prediction_prev.unsqueeze(0)) + gt_prev_class = self.angle_to_class(self.gt_T_minus_1) + prev_target = torch.exp(-(classes - gt_prev_class) ** 2 / (2 * sigma ** 2)) + prev_target /= prev_target.sum() # normalize to sum=1 + + # T + prediction = log_softmax(self.prediction.unsqueeze(0)) + gt_class = self.angle_to_class(self.gt_T) + target = torch.exp(-(classes - gt_class) ** 2 / (2 * sigma ** 2)) + target /= target.sum() # normalize to sum=1 + + kl_loss = torch.nn.KLDivLoss(reduction='batchmean') + loss_T_minus_1 = kl_loss(prediction_prev, prev_target.unsqueeze(0)) # add batch dim + loss_T = kl_loss(prediction, target.unsqueeze(0)) # add batch dim + + self.loss = loss_T_minus_1 + loss_T + + # Run Validation and calculate metrics + def validate(self, img_T_minus_1_val, gt_T_minus_1_val, img_T_val, gt_T_val): + + # Set Data + self.set_data(img_T_minus_1_val, gt_T_minus_1_val, img_T_val, gt_T_val) + + # Augmenting Image + # self.apply_augmentations(is_train=False) + + # Converting to tensor and loading + self.load_data() + + # Calculate IoU score + score = self.calc_score() + + return score + + # Run network on test image and visualize result + def test(self, test_images, test_images_save_path, log_count): + + test_images_list = sorted([f for f in pathlib.Path(test_images).glob("*")]) + + for i in range(0, len(test_images_list)): + # Read test images + frame = cv2.imread(str(test_images_list[i]), cv2.IMREAD_COLOR) + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + image_pil = Image.fromarray(frame) + + # Resize to correct dimensions for network + image_pil = image_pil.resize((640, 320)) + + # Load test images and run inference + test_image_tensor = self.image_loader(image_pil) + test_image_tensor = test_image_tensor.unsqueeze(0) + test_image_tensor = test_image_tensor.to(self.device) + test_output = self.model(test_image_tensor) + + # Process the output and scale to match the input image size + test_output = test_output.squeeze(0).cpu().detach() + test_output = test_output.permute(1, 2, 0) + test_output = test_output.numpy() + + # Resize to match original image dimension + test_output = cv2.resize(test_output, (frame.shape[1], frame.shape[0])) + + # Create visualization + alpha = 0.5 + test_visualization = cv2.addWeighted(self.make_visualization(test_output), \ + alpha, frame, 1 - alpha, 0) + test_visualization = cv2.cvtColor(test_visualization, cv2.COLOR_BGR2RGB) + + # Save visualization + image_save_path = test_images_save_path + str(log_count) + '_' + str(i) + '.jpg' + cv2.imwrite(image_save_path, test_visualization) + + # Loss Backward Pass + def loss_backward(self): + self.loss.backward() + + # Get loss value + def get_loss(self): + return self.loss.item() + + # Run Optimizer + def run_optimizer(self): + self.optimizer.step() + self.optimizer.zero_grad() + + # Set train mode + def set_train_mode(self): + self.model = self.model.train() + + # Set evaluation mode + def set_eval_mode(self): + self.model = self.model.eval() + + # Zero Gradient + def zero_grad(self): + self.optimizer.zero_grad() + + # Save Model + def save_model(self, model_save_path): + print('Saving model') + torch.save(self.model.state_dict(), model_save_path) + + # Logging Training Loss + def log_loss(self, log_count): + self.writer.add_scalar("Loss/train", self.get_loss(), (log_count)) + + # Logging Validation mIoU Score + def log_val(self, val_cel_loss, val_accuracy, log_count): + print('Logging Validation') + self.writer.add_scalar("Val CEL loss", val_cel_loss, (log_count)) + self.writer.add_scalar("Val accuracy", val_accuracy, (log_count)) + + # Calculate IoU score for validation + def calc_score(self): + with torch.no_grad(): + prediction_class = torch.argmax(self.prediction).item() + gt_class = self.angle_to_class(self.gt_T) + accuracy = int(abs(prediction_class - gt_class) <= 1) + + return self.loss.item(), accuracy + + # Save predicted visualization + def save_visualization(self, log_count): + + # Get prediction + prediction = self.prediction.squeeze(0).cpu().detach() + + # Get ground truth + gt_vis = self.gt_T + + # Prediction visualization + prediction_vis = torch.argmax(prediction).item() - 30 + + # Visualize + fig_img = plt.figure(figsize=(8, 4)) + + plt.axis('off') + plt.imshow(self.img_T) + + txt = ( + f"Pred: {prediction_vis:.3f}\n" + f"GT: {gt_vis:.3f}" + ) + plt.text( + 20, 40, # position on the image + txt, + color="black", + fontsize=8, + bbox=dict( + facecolor="white", # light background + alpha=0.7, + edgecolor="black", + boxstyle="round,pad=0.5" + ) + ) + + # plt.show() + + # Write the figure + self.writer.add_figure('Image', fig_img, global_step=(log_count)) + + # Visualize predicted result + def make_visualization(self, result): + + # Getting size of prediction + shape = result.shape + row = shape[0] + col = shape[1] + + # Creating visualization image + vis_predict_object = np.zeros((row, col, 3), dtype="uint8") + + # Assigning background colour + vis_predict_object[:, :, 0] = 61 + vis_predict_object[:, :, 1] = 93 + vis_predict_object[:, :, 2] = 255 + + # Getting foreground object labels + foreground_lables = np.where(result > 0) + + # Assigning foreground objects colour + vis_predict_object[foreground_lables[0], foreground_lables[1], 0] = 255 + vis_predict_object[foreground_lables[0], foreground_lables[1], 1] = 234 + vis_predict_object[foreground_lables[0], foreground_lables[1], 2] = 0 + + return vis_predict_object + + def cleanup(self): + self.writer.flush() + self.writer.close() + print('Finished Training') diff --git a/Models/training/train_auto_steer.py b/Models/training/train_auto_steer.py index 32bc84136..6d3ec5195 100644 --- a/Models/training/train_auto_steer.py +++ b/Models/training/train_auto_steer.py @@ -1,6 +1,212 @@ -# Main train loop for AutoSteer +""" +# Main train loop for AutoSteer # Responsible for getting data via the AutoSteer data loader, passing the data to the AutoSteer trainer class # Running the main train loop over multiple epochs # Does simulation of batch size (we use a batch size of one) # Saving the model checkpoints -# Colating the validation metrics \ No newline at end of file +# Colating the validation metrics +""" + +# %% +# Comment above is for Jupyter execution in VSCode +# ! /usr/bin/env python3 +import torch +import random +from argparse import ArgumentParser +import sys +import torchvision.transforms as T +import numpy as np + +sys.path.append('..') +from Models.data_utils.load_data_auto_steer import LoadDataAutoSteer +from Models.training.auto_steer_trainer import AutoSteerTrainer + + +def main(): + parser = ArgumentParser() + parser.add_argument("-s", "--model_save_root_path", dest="model_save_root_path", + help="root path where pytorch checkpoint file should be saved") + parser.add_argument("-m", "--pretrained_checkpoint_path", dest="pretrained_checkpoint_path", + help="path to EgoLanes weights file for pre-trained backbone") + parser.add_argument("-c", "--checkpoint_path", dest="checkpoint_path", + help="path to Scene3D weights file for training from saved checkpoint") + parser.add_argument('-t', "--test_images_save_path", dest="test_images_save_path", + help="path to where visualizations from inference on test images are saved") + parser.add_argument("-r", "--root", dest="root", help="root path to folder where data training data is stored") + parser.add_argument('-l', "--load_from_save", action='store_true', + help="flag for whether model is being loaded from a EgoLanes checkpoint file") + args = parser.parse_args() + + # Root path + root = args.root + + # Model save path + model_save_root_path = args.model_save_root_path + + # Test data + test_images = root + 'test/' + test_images_save_path = args.test_images_save_path + + # AutoSteer - Data Loading + transform = T.Compose([ + T.Lambda(lambda img: img.crop((0, 420, img.width, img.height))), # left, top, right, bottom + T.Resize((320, 640)), + T.Lambda(lambda img: np.array(img)), + ]) + auto_steer_dataset = LoadDataAutoSteer(root, transform=transform) + total_train_samples, total_val_samples = len(auto_steer_dataset.train), len(auto_steer_dataset.val) + print(total_train_samples, ': total training samples') + print(total_val_samples, ': total validation samples') + + # Load from checkpoint + load_from_checkpoint = False + if (args.load_from_save): + load_from_checkpoint = True + + # Pre-trained model checkpoint path + pretrained_checkpoint_path = args.pretrained_checkpoint_path + checkpoint_path = args.pretrained_checkpoint_path + + # Trainer Class + trainer = 0 + if (load_from_checkpoint == False): + trainer = AutoSteerTrainer(pretrained_checkpoint_path=pretrained_checkpoint_path) + else: + trainer = AutoSteerTrainer(checkpoint_path=checkpoint_path, is_pretrained=True) + + trainer.zero_grad() + + # Total training epochs + num_epochs = 50 + batch_size = 5 + + # Epochs + for epoch in range(0, num_epochs): + + # Printing epochs + print('Epoch: ', epoch + 1) + + # Randomizing data + # randomlist_train_data = random.sample(range(1, num_train_samples), total_train_samples) + # randomlist_train_data = range(1, total_train_samples) + + # Batch size schedule + if (epoch >= 5 and epoch < 10): + batch_size = 4 + + if (epoch >= 10): + batch_size = 3 + + # Learning rate schedule + if (epoch >= 2 and epoch < 15): + trainer.set_learning_rate(0.0001) + if (epoch >= 15): + trainer.set_learning_rate(0.000025) + + # Augmentations schedule + apply_augmentations = False + # if (epoch >= 10 and epoch < 25): + # apply_augmentations = True + # if (epoch >= 25): + # apply_augmentations = False + + # Loop through data + for count in range(0, total_train_samples): + + # Log counter + train_log_count = count + total_train_samples * epoch + + # Read images, apply augmentation, run prediction, calculate + # loss for iterated image from each dataset, and increment + # dataset iterators + + # Get data + # image, gt = auto_steer_dataset.getItem(randomlist_train_data[count]) + img_T_minus_1, gt_T_minus_1, img_T, gt_T = auto_steer_dataset.train[count] + + # Assign Data + trainer.set_data(img_T_minus_1, gt_T_minus_1, img_T, gt_T) + + # # ### Train on flipped images ### # + if random.random() < 0.5: + trainer.apply_augmentations() + + # Converting to tensor and loading + trainer.load_data() + + # Run model and calculate loss + trainer.run_model() + + # Gradient accumulation + trainer.loss_backward() + + # Simulating batch size through gradient accumulation + if ((count + 1) % batch_size == 0): + trainer.run_optimizer() + + # Logging loss to Tensor Board every 250 steps + if ((count + 1) % 250 == 0): + trainer.log_loss(train_log_count) + + # Logging Image to Tensor Board every 1000 steps + if ((count + 1) % 1000 == 0): + trainer.save_visualization(train_log_count) + + # Save model and run validation on entire validation + # dataset after each epoch + + # Save Model + model_save_path = model_save_root_path + 'AutoSteer_iter_' + \ + str(count + total_train_samples * epoch) \ + + '_epoch_' + str(epoch) + '_step_' + \ + str(count) + '.pth' + + trainer.save_model(model_save_path) + + # Test and save visualization + # print('Testing') + # trainer.test(test_images, test_images_save_path, train_log_count) + + # # Validate + print('Validating') + + # Setting model to evaluation mode + trainer.set_eval_mode() + + # Overall IoU + running_cel_loss = 0 + running_l1_loss = 0 + running_val_accuracy = 0 + + # No gradient calculation + with torch.no_grad(): + + # AutoSteer + for val_count in range(0, total_val_samples): + # image_val, gt_val = auto_steer_dataset.getItemVal(val_count) + # frame_id, image_val, gt_val = auto_steer_dataset.getItem(val_count) + img_T_minus_1_val, gt_T_minus_1_val, img_T_val, gt_T_val = auto_steer_dataset.val[val_count] + + # Run Validation and calculate IoU Score + cel_loss, accuracy = trainer.validate(img_T_minus_1_val, gt_T_minus_1_val, img_T_val, gt_T_val) + + # Accumulate individual IoU scores for validation samples + running_cel_loss += cel_loss + running_val_accuracy += accuracy + + # Calculating average loss of complete validation set + val_cel_loss = running_cel_loss / total_val_samples + val_accuracy = running_val_accuracy / total_val_samples + + # Logging average validation loss to TensorBoard + trainer.log_val(val_cel_loss, val_accuracy, epoch) + + # Resetting model back to training + trainer.set_train_mode() + + trainer.cleanup() + + +if __name__ == '__main__': + main() +# %% diff --git a/Models/visualizations/AutoSteer/video_visualization.py b/Models/visualizations/AutoSteer/video_visualization.py new file mode 100644 index 000000000..1610137a1 --- /dev/null +++ b/Models/visualizations/AutoSteer/video_visualization.py @@ -0,0 +1,209 @@ +# %% +# Comment above is for Jupyter execution in VSCode +# ! /usr/bin/env python3 +import cv2 +import sys +import time +import numpy as np +from PIL import Image +from argparse import ArgumentParser +from datetime import datetime, timedelta + +sys.path.append('../..') +from Models.inference.auto_steer_infer import AutoSpeedNetworkInfer + + +def rotate_wheel(wheel_img, angle_deg): + """Rotate a PNG with alpha channel.""" + h, w = wheel_img.shape[:2] + center = (w // 2, h // 2) + + M = cv2.getRotationMatrix2D(center, angle_deg, 1.0) + rotated = cv2.warpAffine( + wheel_img, + M, + (w, h), + flags=cv2.INTER_LINEAR, + borderMode=cv2.BORDER_CONSTANT, + borderValue=(0, 0, 0, 0) + ) + return rotated + + +def overlay_on_top(base_img, overlay_img, frame_time, steering_angle): + """Put wheel image at the top center of the base image.""" + H, W = base_img.shape[:2] + oh, ow = overlay_img.shape[:2] + + # Resize wheel if too large + scale = 0.8 + overlay_img = cv2.resize(overlay_img, None, fx=scale, fy=scale) + oh, ow = overlay_img.shape[:2] + + # Compute placement: centered at the top + x = (W - ow - 60) + y = 20 # small top margin + + # Copy base + image = base_img.copy() + + # If overlay has alpha + if overlay_img.shape[2] == 4: + alpha = overlay_img[:, :, 3] / 255.0 + overlay_rgb = overlay_img[:, :, :3] + + # Blend + for c in range(3): + image[y:y + oh, x:x + ow, c] = ( + overlay_rgb[:, :, c] * alpha + + image[y:y + oh, x:x + ow, c] * (1 - alpha) + ) + + else: + # No alpha, hard paste + image[y:y + oh, x:x + ow] = overlay_img + + # -------- ADD TEXT HERE -------- + cv2.putText( + image, + frame_time, + (x - 60, y + oh + 30), # position (x,y) + cv2.FONT_HERSHEY_SIMPLEX, # font + 0.6, # scale + (255, 255, 255), # color (white) + 2, # thickness + cv2.LINE_AA # anti-aliased + ) + + cv2.putText( + image, + f"{steering_angle:.2f} deg", + (x - 60, y + oh + 60), # position (x,y) + cv2.FONT_HERSHEY_SIMPLEX, # font + 0.6, # scale + (255, 255, 255), # color (white) + 2, # thickness + cv2.LINE_AA # anti-aliased + ) + + return image + + +def make_visualization(frame, prediction): + # Convert prediction to string + text = f"Pred: {prediction:.2f}" + + # Put text on the frame + cv2.putText( + frame, # image + text, # text to display + (20, 40), # position (x, y) + cv2.FONT_HERSHEY_SIMPLEX, # font + 1, # font scale + (0, 255, 0), # color (BGR) + 2, # thickness + cv2.LINE_AA # line type + ) + + +def main(): + parser = ArgumentParser() + parser.add_argument("-e", "--egolanes_checkpoint_path", dest="egolanes_checkpoint_path", + help="path to pytorch EgoLane scheckpoint file to load model dict") + parser.add_argument("-a", "--autosteer_checkpoint_path", dest="autosteer_checkpoint_path", + help="path to pytorch AutoSteer checkpoint file to load model dict") + parser.add_argument("-i", "--video_filepath", dest="video_filepath", + help="path to input video which will be processed by AutoSteer") + parser.add_argument("-o", "--output_file", dest="output_file", + help="path to output video visualization file, must include output file name") + parser.add_argument('-v', "--vis", action='store_true', default=False, + help="flag for whether to show frame by frame visualization while processing is occuring") + args = parser.parse_args() + + # Saved model checkpoint path + egolanes_checkpoint_path = args.egolanes_checkpoint_path + autosteer_checkpoint_path = args.autosteer_checkpoint_path + model = AutoSpeedNetworkInfer(egolanes_checkpoint_path=egolanes_checkpoint_path, + autosteer_checkpoint_path=autosteer_checkpoint_path) + print('AutoSteer Model Loaded') + + # Create a VideoCapture object and read from input file + # If the input is taken from the camera, pass 0 instead of the video file name. + video_filepath = args.video_filepath + cap = cv2.VideoCapture(video_filepath) + fps = cap.get(cv2.CAP_PROP_FPS) + + start_datetime = datetime.now() # or read metadata if available + + # Wheel image + wheel_img_path = "../../../Media/wheel.png" + # Load wheel image (use PNG with transparency) + wheel_raw = cv2.imread(wheel_img_path, cv2.IMREAD_UNCHANGED) + + # Output filepath + output_filepath_obj = args.output_file + '.avi' + fps = cap.get(cv2.CAP_PROP_FPS) + # Video writer object + writer_obj = cv2.VideoWriter(output_filepath_obj, + cv2.VideoWriter_fourcc(*"MJPG"), fps, (1280, 720)) + + # Check if video catpure opened successfully + if (cap.isOpened() == False): + print("Error opening video stream or file") + else: + print('Reading video frames') + + # Transparency factor + alpha = 0.5 + + # Read until video is completed + print('Processing started') + frame_index = 0 + while (cap.isOpened()): + # Capture frame-by-frame + ret, frame = cap.read() + if ret == True: + + # Display the resulting frame + image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + image_pil = Image.fromarray(image) + image_pil = image_pil.resize((640, 320)) + + # Running inference + steering_angle = model.inference(image_pil) + + # --- Rotate wheel image --- + rotated_wheel_img = rotate_wheel(wheel_raw, steering_angle) + + # Resizing to match the size of the output video + # which is set to standard HD resolution + frame = cv2.resize(frame, (1280, 720)) + # make_visualization(frame, prediction) + frame_time = start_datetime + timedelta(seconds=frame_index / fps) + date_time = frame_time.strftime("%m/%d/%Y %I:%M:%S") + frame = overlay_on_top(frame, rotated_wheel_img, date_time, steering_angle) + + if (args.vis): + cv2.imshow('Prediction Objects', frame) + cv2.waitKey(10) + + # Writing to video frame + writer_obj.write(frame) + + else: + print('Frame not read - ending processing') + break + frame_index += 1 + + # When everything done, release the video capture and writer objects + cap.release() + writer_obj.release() + + # Closes all the frames + cv2.destroyAllWindows() + print('Completed') + + +if __name__ == '__main__': + main() +# %%