{ "cells": [ { "cell_type": "markdown", "id": "21b10b99", "metadata": {}, "source": [ "# Task 1: Load Dataset\n", "Load images from disk and count per class to verify dataset integrity" ] }, { "cell_type": "code", "execution_count": null, "id": "d318d1f0", "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "data_dir = '../data/raw/vehicle_classification'\n", "\n", "total_count = 0\n", "\n", "for class_name in os.listdir(data_dir):\n", " class_path = os.path.join(data_dir, class_name)\n", " if os.path.isdir(class_path):\n", " count = len(os.listdir(class_path))\n", " total_count += count\n", " print(f\"{class_name}: {count} images\")\n", "\n", "print(f\"Total Count: {total_count} images\")" ] }, { "cell_type": "markdown", "id": "64122ad4", "metadata": {}, "source": [ "Check out sample image from dataset" ] }, { "cell_type": "code", "execution_count": null, "id": "5604ace3", "metadata": {}, "outputs": [], "source": [ "from PIL import Image\n", "import matplotlib.pyplot as plt\n", "\n", "# first image in first folder\n", "first_class = os.listdir(data_dir)[0]\n", "first_image_path = os.path.join(data_dir, first_class, os.listdir(os.path.join(data_dir, first_class))[0])\n", "\n", "img = Image.open(first_image_path)\n", "print(f\"Size: {img.size}\")\n", "print(f\"Mode: {img.mode}\")\n", "plt.imshow(img)\n", "plt.title(first_class)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "c19ec00a", "metadata": {}, "source": [ "Ensure that all images are RGB, all of same resolution" ] }, { "cell_type": "code", "execution_count": null, "id": "3cedd586", "metadata": {}, "outputs": [], "source": [ "sizes = set()\n", "modes = set()\n", "\n", "for class_name in os.listdir(data_dir):\n", " class_path = os.path.join(data_dir, class_name)\n", " if not os.path.isdir(class_path):\n", " continue\n", " for img_name in os.listdir(class_path):\n", " img = Image.open(os.path.join(class_path, img_name))\n", " sizes.add(img.size)\n", " modes.add(img.mode)\n", "\n", "print(f\"Unique sizes: {sizes}\")\n", "print(f\"Unique modes: {modes}\")" ] }, { "cell_type": "markdown", "id": "88ac961b", "metadata": {}, "source": [ "Accelrate torch with GPU or MPS if available (credit: Claude)" ] }, { "cell_type": "code", "execution_count": null, "id": "8f556b22", "metadata": {}, "outputs": [], "source": [ "import torch\n", "\n", "if torch.cuda.is_available():\n", " DEVICE = torch.device('cuda')\n", " print(f'GPU: {torch.cuda.get_device_name(0)}')\n", "elif torch.backends.mps.is_available():\n", " DEVICE = torch.device('mps')\n", " print('Apple Silicon (MPS)')\n", "else:\n", " DEVICE = torch.device('cpu')\n", " print('CPU')\n", "\n", "print(f'Running on: {DEVICE}')" ] }, { "cell_type": "markdown", "id": "3ad97919", "metadata": {}, "source": [ "# Task 2: Split Dataset 80:20 (Train / Test)\n", "\n", "Augmentation applied to training set only — test set kept clean for fair evaluation" ] }, { "cell_type": "code", "execution_count": null, "id": "f68c1a25", "metadata": {}, "outputs": [], "source": [ "import math \n", "from torchvision import datasets, transforms\n", "from torch.utils.data import random_split, DataLoader\n", "\n", "\n", "train_transform = transforms.Compose([\n", " transforms.Resize((64, 64)), # Resize to 64x64 (even though all images are)\n", " transforms.RandomHorizontalFlip(), # randomly mirror image\n", " transforms.RandomRotation(20), # rotate up to 20 degrees\n", " transforms.ColorJitter(brightness=0.3, contrast=0.3), # vary lighting\n", " transforms.ToTensor(),\n", " transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) # Normalize tensors as in Pytorch tutorial\n", "])\n", "\n", "test_transform = transforms.Compose([\n", " transforms.Resize((64,64)), # Resize to 64x64 (even though all images are)\n", " transforms.ToTensor(),\n", " transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) # Normalize tensors as in Pytorch tutorial\n", "])\n", "\n", "train_full = datasets.ImageFolder(root=data_dir, transform=train_transform) #Load full dataset with train transform\n", "test_full = datasets.ImageFolder(root=data_dir, transform=test_transform) # Load full dataset with test transform\n", "\n", "train_size = math.floor(len(train_full) * 0.8) #80% split for training\n", "test_size = len(train_full) - train_size # Remaining 20% used for testing\n", "\n", "torch.manual_seed(42) # Fixes the RNG to the same starting point...42 is convention according to GeeksForGeeks\n", "indices = torch.randperm(len(train_full)).tolist() #randomly shuffle the indices\n", "\n", "train_indices = indices[:train_size] # First 80% of indices\n", "test_indices = indices[train_size:] # remaining 20% of indices\n", "\n", "train_dataset = torch.utils.data.Subset(train_full, train_indices) #Create final datasets\n", "test_dataset = torch.utils.data.Subset(test_full, test_indices)\n", "\n", "print(f\"Train: {len(train_dataset)}, Test: {len(test_dataset)}\")" ] }, { "cell_type": "markdown", "id": "2eede814", "metadata": {}, "source": [ "Credit: Claude: load dataset into batches (64 is standard), and dedicate n threads to the process (min 1, preferrably 4)" ] }, { "cell_type": "code", "execution_count": null, "id": "e1539eaa", "metadata": {}, "outputs": [], "source": [ "NUM_WORKERS = min(4, os.cpu_count() or 1)\n", "PIN_MEMORY = (DEVICE.type == 'cuda') # Pin memory if GPU available for CUDA\n", "\n", "print(NUM_WORKERS)\n", "\n", "train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True,\n", " num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)\n", "test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False,\n", " num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)\n", "classes = train_full.classes \n", "print(classes)\n", "print(len(classes))" ] }, { "cell_type": "markdown", "id": "e7255041", "metadata": {}, "source": [ "# Task 3: CNN Architecture\n", "Model takes a batch of (3, 64, 64) images and outputs 8 class scores" ] }, { "cell_type": "code", "execution_count": null, "id": "d1b7d9ca", "metadata": {}, "outputs": [], "source": [ "import torch.nn as nn\n", "\n", "class Net(nn.Module):\n", " def __init__(self):\n", " super(Net, self).__init__()\n", "\n", " self.features = nn.Sequential(\n", " # go from 64x64 to 32x32\n", "\n", " # kernel size = 3x3 filter patch\n", " #padding = 1, so 64x64 stays 64x64 after conv\n", " nn.Conv2d(3, 32, kernel_size=3, padding=1), # 3 channel RRGB, 32 filters (recommended), and adding 1 pixel of zeros so keep output at same size\n", "\n", " nn.BatchNorm2d(32), # mirrors conv2d output\n", " nn.ReLU(), #Activation fn\n", " nn.MaxPool2d(2,2), # 2x2 window, stride = 2: so halved --> 32x32\n", "\n", " #Go from 32x32 --> 16x16 in the same manner\n", "\n", " nn.Conv2d(32, 64, kernel_size=3, padding=1), # Double filters --> more complex features detected\n", " nn.BatchNorm2d(64),\n", " nn.ReLU(),\n", " nn.MaxPool2d(2,2), \n", " \n", " # Go from 16x 16 0 --> 8x8 in the same manner\n", "\n", " nn.Conv2d(64, 128, kernel_size=3, padding=1), # Double filters again --> even more complex rfeatures detected\n", " nn.BatchNorm2d(128),\n", " nn.ReLU(),\n", " nn.MaxPool2d(2,2), \n", "\n", " )\n", "\n", " self.classifier = nn.Sequential(\n", " nn.Flatten(),\n", " nn.Linear (128 * 8 *8, 512), # flattened size of 8x8 * 128, 512 is arbitrary number of hidden neurons (recommended by GeeksForGeeks)....tunned to learn details without overfitting\n", " nn.ReLU(),\n", " nn.Dropout(0.5),#Randomly zero 50% of neurons --> prevent memorization and overfitting\n", " nn.Linear(512, len(classes)) # one score per vehicle class\n", " )\n", "\n", " def forward(self, x): \n", " x = self.features(x) # extract spatial features via conv blocks\n", " x = self.classifier(x) # flatten to 8 vehicle clases\n", " return x\n", "\n", "model = Net().to(DEVICE)\n", "device = DEVICE\n", "\n", "print(model)" ] }, { "cell_type": "markdown", "id": "22e71032", "metadata": {}, "source": [ "Loss fn and optimizer" ] }, { "cell_type": "code", "execution_count": null, "id": "54d11a04", "metadata": {}, "outputs": [], "source": [ "import torch.optim as optim\n", "\n", "criterion = nn.CrossEntropyLoss() # Applied softmax to convert scores --> probabilities --> penalizes model \n", "\n", "# Changed to adam optimizer (internal momentumn calculation)\n", "optimizer = optim.Adam(model.parameters(), lr=0.001)" ] }, { "cell_type": "markdown", "id": "572d80e3", "metadata": {}, "source": [ "# Task 4: Train Model\n", "\n", "Track loss and accuracy per epoch — stored in lists for plotting" ] }, { "cell_type": "code", "execution_count": null, "id": "374d0590", "metadata": {}, "outputs": [], "source": [ "train_losses, train_accs = [], [] #To store accs for visualization\n", "\n", "for epoch in range(30): # 30 epochs\n", " running_loss = 0.0 # keep track of running loss\n", " correct = 0\n", " total = 0\n", "\n", " for i, data in enumerate(train_loader, 0):\n", " inputs, labels = data\n", " inputs, labels = inputs.to(device), labels.to(device)\n", " optimizer.zero_grad() # clear prior gradients\n", " outputs = model(inputs) # forward pass \n", " loss = criterion(outputs, labels) # compare to GT\n", " loss.backward() # Backprop...compute gradient of loss\n", " optimizer.step() #Use adam optimizer to update weights using gradients\n", "\n", " running_loss += loss.item() #Extract scalar loss value \n", " _, predicted = torch.max(outputs, 1) # Take index of highest score as predicted class\n", " total += labels.size(0)\n", " correct += (predicted == labels).sum().item() #update correct tally\n", "\n", " epoch_loss = running_loss / len(train_loader) #Compute avg loss\n", " epoch_acc = 100 * correct / total #Avg acc accross epoch\n", "\n", "\n", " #Adding epochs to list\n", " train_losses.append(epoch_loss)\n", " train_accs.append(epoch_acc) \n", "\n", " print(f'Epoch {epoch+1}: Loss={epoch_loss:.3f}, Accuracy={epoch_acc:.2f}%')\n", "\n", "print('Finished Training')" ] }, { "cell_type": "markdown", "id": "26ab705a", "metadata": {}, "source": [ "# Task 6 (Bonus): Plot Loss & Accuracy Curves\n", "\n", "Visualises how loss decreased and accuracy improved across 30 epochs [credit: Claude]" ] }, { "cell_type": "code", "execution_count": null, "id": "c71ee0ff", "metadata": {}, "outputs": [], "source": [ "fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))\n", "\n", "\n", "# Plot 1: Training loss vs epoch \n", "ax1.plot(train_losses, color='steelblue', linewidth=2)\n", "ax1.set_title('Training Loss')\n", "ax1.set_xlabel('Epoch')\n", "ax1.set_ylabel('Loss')\n", "ax1.grid(True, alpha=0.3)\n", "\n", "#Plot 2: training acc. vs epoch\n", "ax2.plot(train_accs, color='darkorange', linewidth=2)\n", "ax2.set_title('Training Accuracy')\n", "ax2.set_xlabel('Epoch')\n", "ax2.set_ylabel('Accuracy (%)')\n", "ax2.grid(True, alpha=0.3)\n", "\n", "#Concat two plots, save, and show\n", "plt.suptitle('Training Curves', fontsize=14, fontweight='bold')\n", "plt.tight_layout()\n", "os.makedirs('../results', exist_ok=True)\n", "plt.savefig('../results/training_curves.png', dpi=150, bbox_inches='tight')\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "b3bfda75", "metadata": {}, "source": [ "Save Model" ] }, { "cell_type": "code", "execution_count": null, "id": "2bf2b9a2", "metadata": {}, "outputs": [], "source": [ "os.makedirs('../models', exist_ok=True)\n", "PATH = '../models/final-classifier.pth'\n", "torch.save(model.state_dict(), PATH)" ] }, { "cell_type": "markdown", "id": "057d5d72", "metadata": {}, "source": [ "# Task 5: Final Accuracy\n", "Evaluate on both train and test sets with Dropout disabled (model.eval())" ] }, { "cell_type": "code", "execution_count": null, "id": "9e54f566", "metadata": {}, "outputs": [], "source": [ "model.eval() # Switch to eval mode (disabled droupout --> higher acc) (credit: Claude)\n", "train_correct, train_total = 0, 0\n", "with torch.no_grad(): #Gradient computation not needed for inference\n", " for images, labels in train_loader:\n", " images, labels = images.to(device), labels.to(device)\n", " outputs = model(images) #Fwd pass only\n", " _, predicted = torch.max(outputs, 1) #highest score = predicted class\n", " train_total += labels.size(0) #Count total \n", " train_correct += (predicted == labels).sum().item() # Count correct\n", "\n", "# Test accuracy - repeat with test set\n", "test_correct, test_total = 0, 0\n", "with torch.no_grad():\n", " for images, labels in test_loader:\n", " images, labels = images.to(device), labels.to(device)\n", " outputs = model(images)\n", " _, predicted = torch.max(outputs, 1)\n", " test_total += labels.size(0)\n", " test_correct += (predicted == labels).sum().item()\n", "\n", "print(f'Final Train Accuracy : {100 * train_correct / train_total:.2f}%')\n", "print(f'Final Test Accuracy : {100 * test_correct / test_total:.2f}%')" ] }, { "cell_type": "markdown", "id": "60666242", "metadata": {}, "source": [ "Credit Claude: Testing accuracy per class" ] }, { "cell_type": "code", "execution_count": null, "id": "8cc7ed40", "metadata": {}, "outputs": [], "source": [ "correct_pred = {classname: 0 for classname in classes} # Correct predicitions per class\n", "total_pred = {classname: 0 for classname in classes} # total images seen per class\n", "\n", "model.eval()\n", "with torch.no_grad():\n", " for data in test_loader:\n", " images, labels = data\n", " images, labels = images.to(device), labels.to(device)\n", " outputs = model(images)\n", " _, predictions = torch.max(outputs, 1) #predicted class index per class\n", " for label, prediction in zip(labels, predictions):\n", " if label == prediction:\n", " correct_pred[classes[label]] += 1\n", " total_pred[classes[label]] += 1\n", "\n", "for classname, correct_count in correct_pred.items():\n", " accuracy = 100 * float(correct_count) / total_pred[classname]\n", " print(f'Accuracy for class: {classname:10s} is {accuracy:.1f}%')" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.15" } }, "nbformat": 4, "nbformat_minor": 5 }