1+ import os
2+ from pathlib import Path
3+
4+ import torch
5+ import torch .nn as nn
6+ from torchvision .datasets import VisionDataset
7+ from torch .utils .data import Dataset , DataLoader
8+
9+ # import matplotlib.pyplot as plt
10+
11+ import numpy as np
12+ import json
13+
14+ my_dir = Path (__file__ ).resolve ().parent
15+
16+ def get_best_device ():
17+ if torch .cuda .is_available ():
18+ return torch .device ('cuda' )
19+ if torch .backends .mps .is_available ():
20+ return torch .device ('mps' )
21+ return torch .device ('cpu' )
22+
23+ def download_cat_breeds_data (data_dir ):
24+ import kagglehub
25+ import cv2
26+
27+ data_dir = Path (data_dir ).resolve ()
28+ data_dir .mkdir (parents = True , exist_ok = True )
29+
30+ output_dir = data_dir / 'catbreeds'
31+ output_dir .mkdir (parents = True , exist_ok = True )
32+
33+ img_dir = output_dir / 'images'
34+ img_dir .mkdir (parents = True , exist_ok = True )
35+
36+ label_dir = output_dir / 'labels'
37+ label_dir .mkdir (parents = True , exist_ok = True )
38+
39+ raw_dir = output_dir / 'raw'
40+ raw_dir .mkdir (parents = True , exist_ok = True )
41+
42+ label_names_path = output_dir / 'label_names.json'
43+
44+ # Download latest version
45+ dl_path = kagglehub .dataset_download ('imbikramsaha/cat-breeds' )
46+
47+ cat_breeds_dir = Path (dl_path ).resolve ()
48+ print ("Path to dataset files:" , cat_breeds_dir )
49+
50+ cat_classes_dir = cat_breeds_dir / 'cats-breads'
51+
52+ classes = sorted ([c for c in cat_classes_dir .iterdir () if c .is_dir ()])
53+
54+ label_names = {}
55+
56+ n = 0
57+ for i , c in enumerate (classes ):
58+ print (f'Processing label: { i } , class: { c .name } ' )
59+ label_names [i ] = c .name
60+
61+ raw_class_dir = raw_dir / 'resized' / c .name
62+ raw_class_dir .mkdir (parents = True , exist_ok = True )
63+
64+ files = [f for f in c .iterdir () if f .is_file ()]
65+ for f in files :
66+ image = cv2 .imread (str (f ))
67+ if image is None :
68+ # Skip corrupted or non-image fpaths
69+ continue
70+ image = cv2 .cvtColor (image , cv2 .COLOR_BGR2RGB )
71+ resized_img = cv2 .resize (
72+ src = image ,
73+ dsize = (32 , 32 ),
74+ interpolation = cv2 .INTER_CUBIC
75+ )
76+ transposed_img = resized_img .transpose (2 , 0 , 1 )
77+
78+ img_path = img_dir / f'item{ n } '
79+ label_path = label_dir / f'item{ n } '
80+ raw_img_path = raw_class_dir / str (f .name )
81+
82+ np .save (str (img_path .resolve ()), transposed_img )
83+ np .save (str (label_path .resolve ()), i )
84+ cv2 .imwrite (str (raw_img_path .resolve ()), resized_img ) # Only save resized image
85+
86+ n += 1
87+
88+ with open (label_names_path , 'w' ) as f :
89+ json .dump (label_names , f )
90+
91+ download_dir = cat_breeds_dir .parent .parent .parent .parent .resolve ()
92+ # print(f'Deleting downloaded dataset files in {download_dir}')
93+ # shutil.rmtree(download_dir)
94+
95+ return output_dir
96+
97+
98+
99+ # for https://www.kaggle.com/datasets/imbikramsaha/cat-breeds/data
100+ class CatBreedsData (VisionDataset ):
101+ def __init__ (self , data_dir , download = False , device = None ):
102+ if download :
103+ download_cat_breeds_data (data_dir )
104+ data_dir = Path (data_dir / 'catbreeds' ).resolve ()
105+
106+ img_dir = data_dir / 'images'
107+ label_dir = data_dir / 'labels'
108+ label_names_path = data_dir / 'label_names.json'
109+
110+ labels = []
111+ for label_path in label_dir .iterdir ():
112+ if 'item' in label_path .name :
113+ label_data = np .load (label_path )
114+ labels .append (label_data )
115+
116+ images = []
117+ for img_path in img_dir .iterdir ():
118+ if 'item' in img_path .name :
119+ img_data = np .load (img_path )
120+ images .append (img_data )
121+
122+ label_names = {}
123+ with open (label_names_path , 'r' ) as f :
124+ label_names = json .load (f )
125+
126+ # if device is None:
127+ # device = torch.get_default_device()
128+ # self.device = device
129+
130+ self .labels = np .array (labels )
131+ self .images = np .array (images )
132+ self .label_names = label_names
133+
134+ assert len (self .images ) == len (self .labels ) and len (set (self .labels )) == len (self .label_names )
135+
136+ def __len__ (self ):
137+ return len (self .labels )
138+
139+ def __getitem__ (self , idx ):
140+ if torch .is_tensor (idx ):
141+ idx = idx .tolist ()
142+ image = torch .tensor (self .images [idx ]).float ()
143+ label = torch .tensor (self .labels [idx ]).long ()
144+ # `tensor` is lowercase to make `lab` a 0-dim tensor
145+ return (image ,label )
146+
147+ def get_label_name (self ,idx ):
148+ if idx in self .label_names :
149+ return self .label_names [idx ].lower ()
150+ raise ValueError (f'Label index { idx } not found in label names.' )
151+
152+ def get_label_idx (self ,label ):
153+ for k ,v in self .label_names .items ():
154+ if v .lower () == label .lower ():
155+ return k
156+ raise ValueError (f'Label { label } not found in label names.' )
157+
158+
159+ class SmallCNN (nn .Module ):
160+ def __init__ (self ):
161+ super (SmallCNN , self ).__init__ ()
162+ self .layers = nn .Sequential (
163+ nn .Conv2d (3 , 64 , 3 , padding = 0 ),
164+ nn .ReLU (),
165+ nn .MaxPool2d (kernel_size = 2 ),
166+ nn .Conv2d (64 , 128 , 3 , padding = 0 ),
167+ nn .ReLU (),
168+ nn .MaxPool2d (kernel_size = 2 ),
169+ nn .Conv2d (128 , 256 , 3 , padding = 0 ),
170+ nn .ReLU (),
171+ nn .MaxPool2d (kernel_size = 2 ),
172+ nn .Flatten (),
173+ nn .Dropout (0.5 ),
174+ nn .LazyLinear (2048 ),
175+ nn .ReLU (),
176+ nn .LazyLinear (512 ),
177+ nn .ReLU (),
178+ nn .LazyLinear (12 ),
179+ nn .LogSoftmax (dim = 1 )
180+ )
181+
182+ def forward (self , x ):
183+ return self .layers (x )
184+
185+ def train (model ,
186+ device ,
187+ train_loader ,
188+ optimizer ,
189+ criterion ,
190+ epoch ,
191+ lambda_reg = 0.01 ,
192+ one_pass = False ,
193+ verbose = False ):
194+
195+ # print('Training model...')
196+ model .train ()
197+ avg_loss = 0
198+
199+ for batch_idx , (data , target ) in enumerate (train_loader ):
200+ data = data .to (device )
201+ target = target .to (device )
202+ optimizer .zero_grad ()
203+ output = model (data )
204+
205+ loss = criterion (output , target )
206+ avg_loss += loss
207+ loss .backward ()
208+
209+ optimizer .step ()
210+ if one_pass : break
211+
212+ avg_loss /= len (train_loader .dataset )
213+
214+ if verbose :
215+ print (f'Train Epoch: { epoch } \t Average loss: { avg_loss :.6f} ' )
216+ return avg_loss
217+
218+ def eval (model ,
219+ device ,
220+ test_loader ,
221+ optimizer ,
222+ criterion ,
223+ epoch ):
224+
225+ # print('Evaluating model...')
226+ model .eval ()
227+ avg_loss = 0
228+
229+ for batch_idx , (data , target ) in enumerate (test_loader ):
230+ data = data .to (device )
231+ target = target .to (device )
232+ output = model (data )
233+ loss = criterion (output , target )
234+ avg_loss += loss
235+
236+ avg_loss /= len (test_loader .dataset )
237+
238+ # print(f'Test Epoch: {epoch} \tAverage loss: {avg_loss:.6f}')
239+ return avg_loss
240+
241+
242+ # def eval(model,
243+ # device,
244+ # test_loader):
245+
246+ # print('Evaluating model...')
247+ # model.eval()
248+ # test_loss = 0
249+ # correct = 0
250+
251+ # with torch.no_grad():
252+ # for data, target in test_loader:
253+ # data = data.to(device)
254+ # target = target.to(device)
255+ # output = model(data)
256+ # test_loss += criterion(output, target).item()
257+ # pred = output.argmax(dim=1, keepdim=True)
258+ # correct += pred.eq(target.view_as(pred)).sum().item()
259+
260+ # test_loss /= len(test_loader.dataset)
261+
262+ # print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n')
263+
264+
265+ def get_attributes (model ):
266+ return { str (k ): str (v [0 ]) if isinstance (v ,tuple ) else str (v ).lower () for (k ,v ) in dict (model .__dict__ ).items () if k [0 ] != '_' }
267+
268+ def get_summary (model ,global_name ,parent_name = None ):
269+ model_name = model .__class__ .__name__
270+ d = {
271+ 'layerType' : model_name ,
272+ 'attributes' : get_attributes (model ),
273+ 'subModules' : { name : get_summary (m ,global_name = global_name ,parent_name = name ) for name ,m in model .named_children () },
274+ 'subModuleOrder' : [name for name ,_ in model .named_children ()]
275+ }
276+ return d
277+
278+ def has_same_architecture (model1 , model2 ):
279+ return get_summary (model1 ,'' ) == get_summary (model2 ,'' )
280+
281+ if __name__ == "__main__" :
282+ my_dir = Path (__file__ ).resolve ().parent
283+ data_dir = my_dir / 'data'
284+ model_path = my_dir / 'models' / 'pretest.pt'
285+
286+ print ('Constructing CatBreedsData Dataset.' )
287+ data_set = CatBreedsData (data_dir )
288+
289+ data_set_size = len (data_set )
290+ train_len = int (data_set_size * 0.9 )
291+ test_len = data_set_size - train_len
292+ train_set , test_set = torch .utils .data .random_split (data_set , [train_len , test_len ])
293+
294+ print ('Train set size:' , len (train_set ))
295+ print ('Test set size:' , len (test_set ))
296+
297+ print ('Creating DataLoader(s).' )
298+ train_batch_size = test_len
299+ train_loader = DataLoader (train_set , batch_size = train_batch_size , shuffle = True )
300+
301+ test_batch_size = test_len
302+ test_loader = DataLoader (test_set , batch_size = test_batch_size , shuffle = True )
303+
304+ test_features , test_labels = next (iter (test_loader ))
305+ print (test_features [0 ].shape ,test_labels [0 ])
306+ print (test_features [1 ].shape ,test_labels [1 ])
307+
308+ device = get_best_device ()
309+ print ('Using device:' , device )
310+
311+ print ('Creating model...' )
312+ model = SmallCNN ()
313+
314+ # if model_path.exists():
315+ # print('Loading model from', model_path)
316+ # model1 = torch.load(model_path)
317+ # if has_same_architecture(model, model1):
318+ # print('Model architecture match. Using existing model.')
319+ # model = model1
320+ # else:
321+ # print('Model architecture mismatch. Using new model.')
322+
323+ print ('Moving model to device...' )
324+ model .to (device )
325+
326+ epochs = 400
327+
328+ print ('Constructing optimizer and criterion...' )
329+ optimizer = torch .optim .Adam (model .parameters (), lr = 1e-3 , weight_decay = 1e-4 )
330+ criterion = torch .nn .CrossEntropyLoss ()
331+
332+ print ('Starting training...' )
333+ for epoch in range (epochs ):
334+ train_loss = train (model , device , train_loader , optimizer , criterion , epoch , one_pass = False , verbose = False )
335+ eval_loss = eval (model , device , test_loader , optimizer , criterion , epoch )
336+ print (f'Epoch { epoch } \t Train loss: { train_loss :.6f} \t Test loss: { eval_loss :.6f} ' )
337+
338+ model .to (torch .device ('cpu' ))
339+ torch .save (model , model_path )
340+ print ("Model saved to" , my_dir / 'models' / 'pretest.pt' )
341+
342+
343+
0 commit comments