1+ #!/usr/bin/env python3
2+
3+ import argparse
4+ import json
5+ import logging
6+ import os
7+ import datetime
8+ from typing import Dict , Any , List , NamedTuple
9+ import omegaup .api
10+ import re
11+ from urllib .parse import urlparse , urljoin
12+ import shutil
13+ import http .client
14+ import ssl
15+ import zipfile
16+
17+ context = ssl ._create_unverified_context ()
18+
19+ logging .basicConfig (level = logging .INFO )
20+ LOG = logging .getLogger (__name__ )
21+
22+ # ✅ Allowed course aliases
23+ COURSE_ALIASES = [
24+ "curso-publico" ,
25+ "omi-public-course"
26+ ]
27+
28+ DOWNLOAD_BASE_FOLDER = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." , "Courses" ))
29+ PROBLEMS_JSON_PATH = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." , "problems.json" ))
30+
31+ class ProblemEntry (NamedTuple ):
32+ path : str
33+
34+ def sanitize_filename (name : str ) -> str :
35+ return re .sub (r'[^a-zA-Z0-9_\-\.]' , '_' , name )
36+
37+
38+ def handle_input () -> tuple [str , str , str ]:
39+ parser = argparse .ArgumentParser (description = "Add or remove problems from course assignments." )
40+ parser .add_argument ("--url" , default = "https://omegaup.com" , help = "omegaUp base URL" )
41+ parser .add_argument ("--api-token" , type = str , default = os .environ .get ("OMEGAUP_API_TOKEN" ),
42+ required = ("OMEGAUP_API_TOKEN" not in os .environ ))
43+
44+ parser .add_argument ("--input" , type = str , default = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." , "adding_removing_problems.json" )), help = "Path to JSON file" )
45+
46+ args = parser .parse_args ()
47+ return args .api_token , args .url , args .input
48+
49+
50+ def assignment_exists (assignments : List [Dict [str , Any ]], alias : str ) -> bool :
51+ return any (a ["alias" ] == alias for a in assignments )
52+
53+
54+ def create_assignment (client : omegaup .api .Client , course_alias : str , assignment_alias : str ) -> None :
55+ now = datetime .datetime .now (datetime .timezone .utc )
56+ finish = now + datetime .timedelta (days = 30 )
57+
58+ LOG .info (f"📅 Creating assignment '{ assignment_alias } ' in course '{ course_alias } '" )
59+
60+ try :
61+ client .course .createAssignment (
62+ course_alias = course_alias ,
63+ alias = assignment_alias ,
64+ assignment_type = "homework" ,
65+ name = assignment_alias ,
66+ description = f"Auto-created assignment { assignment_alias } " ,
67+ start_time = int (now .timestamp ()),
68+ finish_time = int (finish .timestamp ()),
69+ unlimited_duration = True
70+ )
71+ LOG .info (f"✅ Created assignment '{ assignment_alias } '" )
72+ except Exception as e :
73+ LOG .error (f"❌ Failed to create assignment '{ assignment_alias } ': { e } " )
74+
75+
76+ def download_and_unzip (problem_alias : str , assignment_folder : str , base_url : str , api_token : str ) -> bool :
77+ try :
78+ download_url = urljoin (base_url , f"/api/problem/download/problem_alias/{ problem_alias } /" )
79+ parsed_url = urlparse (download_url )
80+ conn = http .client .HTTPSConnection (parsed_url .hostname , context = context )
81+
82+ headers = {'Authorization' : f'token { api_token } ' }
83+ path = parsed_url .path
84+
85+ conn .request ("GET" , path , headers = headers )
86+ response = conn .getresponse ()
87+
88+ if response .status == 404 :
89+ response_body = response .read ()
90+ LOG .warning (
91+ f"⚠️ Problem '{ problem_alias } ' not found or access denied (404). "
92+ f"Response body:\n { response_body .decode (errors = 'ignore' )} "
93+ )
94+ return False
95+ elif response .status != 200 :
96+ response_body = response .read ()
97+ LOG .error (f"❌ Failed to download '{ problem_alias } '. HTTP status: { response .status } " )
98+ LOG .error (f"❌ Response body:\n { response_body .decode (errors = 'ignore' )} " )
99+ return False
100+
101+ problem_folder = os .path .join (assignment_folder , sanitize_filename (problem_alias ))
102+ os .makedirs (problem_folder , exist_ok = True )
103+
104+ zip_path = os .path .join (problem_folder , f"{ problem_alias } .zip" )
105+ with open (zip_path , "wb" ) as f :
106+ while True :
107+ chunk = response .read (8192 )
108+ if not chunk :
109+ break
110+ f .write (chunk )
111+
112+ try :
113+ with zipfile .ZipFile (zip_path , "r" ) as zip_ref :
114+ zip_ref .extractall (problem_folder )
115+ os .remove (zip_path )
116+ LOG .info (f"✅ Extracted: { problem_alias } → { problem_folder } " )
117+ except zipfile .BadZipFile as e :
118+ LOG .error (f"❌ Failed to unzip: { zip_path } : { e } " )
119+ return False
120+
121+ settings_path = os .path .join (problem_folder , "settings.json" )
122+ if os .path .exists (settings_path ):
123+ try :
124+ with open (settings_path , "r+" , encoding = "utf-8" ) as f :
125+ settings = json .load (f )
126+ settings ["alias" ] = problem_alias
127+ settings ["title" ] = problem_alias
128+ f .seek (0 )
129+ json .dump (settings , f , indent = 2 , ensure_ascii = False )
130+ f .truncate ()
131+ LOG .info (f"🛠️ Updated settings.json with alias: { problem_alias } " )
132+ except json .JSONDecodeError as e :
133+ LOG .warning (f"⚠️ Failed to update settings.json for '{ problem_alias } ': { e } " )
134+ else :
135+ LOG .warning (f"⚠️ No settings.json found for '{ problem_alias } '" )
136+
137+ return True
138+
139+ except Exception as e :
140+ LOG .error (f"❌ Failed to download '{ problem_alias } ': { e } " )
141+ return False
142+
143+
144+ def process_add (data : Dict [str , Any ], problems_data : Dict [str , List [Dict [str , str ]]],
145+ client : omegaup .api .Client , base_url : str ):
146+ for item in data .get ("add_problem" , []):
147+ course = item ["course_alias" ]
148+ assignment = item ["assignment_alias" ]
149+ problem = item ["problem_alias" ]
150+ points = item ["points" ]
151+
152+ if course not in COURSE_ALIASES :
153+ LOG .error (f"❌ Course '{ course } ' not allowed." )
154+ continue
155+
156+ LOG .info (f"➕ Adding problem '{ problem } ' to assignment '{ assignment } ' in course '{ course } '" )
157+
158+ try :
159+ assignments = client .course .listAssignments (course_alias = course ).get ("assignments" , [])
160+ if not assignment_exists (assignments , assignment ):
161+ LOG .warning (f"📂 Assignment '{ assignment } ' not found in course '{ course } ', creating it..." )
162+ create_assignment (client , course , assignment )
163+
164+ client .course .addProblem (
165+ course_alias = course ,
166+ assignment_alias = assignment ,
167+ problem_alias = problem ,
168+ points = points
169+ )
170+ LOG .info (f"✅ Added problem '{ problem } ' to assignment '{ assignment } '" )
171+
172+ assignment_folder = os .path .join (DOWNLOAD_BASE_FOLDER , sanitize_filename (course ),
173+ sanitize_filename (assignment ))
174+ os .makedirs (assignment_folder , exist_ok = True )
175+
176+ LOG .info (f"📥 Downloading and unzipping problem '{ problem } '" )
177+ success = download_and_unzip (
178+ problem_alias = problem ,
179+ assignment_folder = assignment_folder ,
180+ base_url = base_url ,
181+ api_token = client .api_token
182+ )
183+
184+ if success :
185+ add_problem_to_json (course , assignment , problem , problems_data )
186+ LOG .info (f"📘 problems.json updated with: Courses/{ course } /{ assignment } /{ problem } " )
187+ else :
188+ LOG .warning (f"⚠️ Skipping problems.json update due to failed download for '{ problem } '" )
189+
190+ except Exception as e :
191+ LOG .error (f"❌ Failed to add problem '{ problem } ': { e } " )
192+
193+
194+ def process_remove (data : Dict [str , Any ], problems_data : Dict [str , List [Dict [str , str ]]],
195+ client : omegaup .api .Client ):
196+ for item in data .get ("remove_problem" , []):
197+ course = item ["course_alias" ]
198+ assignment = item ["assignment_alias" ]
199+ problem = item ["problem_alias" ]
200+
201+ if course not in COURSE_ALIASES :
202+ LOG .error (f"❌ Course '{ course } ' not allowed." )
203+ continue
204+
205+ LOG .info (f"➖ Removing problem '{ problem } ' from assignment '{ assignment } ' in course '{ course } '" )
206+
207+ try :
208+ assignments = client .course .listAssignments (course_alias = course ).get ("assignments" , [])
209+ if not assignment_exists (assignments , assignment ):
210+ LOG .warning (f"⚠️ Assignment '{ assignment } ' not found in course '{ course } ', skipping removal." )
211+ continue
212+
213+ client .course .removeProblem (
214+ course_alias = course ,
215+ assignment_alias = assignment ,
216+ problem_alias = problem
217+ )
218+ LOG .info (f"✅ Removed problem '{ problem } ' from assignment '{ assignment } '" )
219+
220+ problem_folder = os .path .join (
221+ DOWNLOAD_BASE_FOLDER ,
222+ sanitize_filename (course ),
223+ sanitize_filename (assignment ),
224+ sanitize_filename (problem )
225+ )
226+ if os .path .exists (problem_folder ):
227+ try :
228+ shutil .rmtree (problem_folder )
229+ LOG .info (f"🗑️ Deleted folder for problem '{ problem } ' at { problem_folder } " )
230+ except OSError as e :
231+ LOG .warning (f"⚠️ Failed to delete folder '{ problem_folder } ': { e } " )
232+ else :
233+ LOG .warning (f"⚠️ Folder '{ problem_folder } ' not found, skipping deletion." )
234+
235+ remove_problem_from_json (course , assignment , problem , problems_data )
236+ LOG .info (f"📘 problems.json entry removed: Courses/{ course } /{ assignment } /{ problem } " )
237+
238+ except Exception as e :
239+ LOG .error (f"❌ Failed to remove problem '{ problem } ': { e } " )
240+
241+
242+ def load_problems_json () -> Dict [str , List [ProblemEntry ]]:
243+ if os .path .exists (PROBLEMS_JSON_PATH ):
244+ with open (PROBLEMS_JSON_PATH , "r" , encoding = "utf-8" ) as f :
245+ data = json .load (f )
246+ return {"problems" : [ProblemEntry (** p ) for p in data .get ("problems" , [])]}
247+ return {"problems" : []}
248+
249+
250+ def save_problems_json (data : Dict [str , List [ProblemEntry ]]):
251+ with open (PROBLEMS_JSON_PATH , "w" , encoding = "utf-8" ) as f :
252+ json .dump ({"problems" : [p ._asdict () for p in data ["problems" ]]}, f , indent = 2 , ensure_ascii = False )
253+
254+
255+ def add_problem_to_json (course : str , assignment : str , problem : str ,
256+ problems_data : Dict [str , List [ProblemEntry ]]):
257+ path = f"Courses/{ course } /{ assignment } /{ problem } "
258+ if not any (p .path == path for p in problems_data ["problems" ]):
259+ problems_data ["problems" ].append (ProblemEntry (path = path ))
260+ LOG .info (f"📝 Added '{ path } ' to problems.json" )
261+
262+
263+ def remove_problem_from_json (course : str , assignment : str , problem : str ,
264+ problems_data : Dict [str , List [ProblemEntry ]]):
265+ path = f"Courses/{ course } /{ assignment } /{ problem } "
266+ before = len (problems_data ["problems" ])
267+ problems_data ["problems" ] = [p for p in problems_data ["problems" ] if p .path != path ]
268+ after = len (problems_data ["problems" ])
269+ if before != after :
270+ LOG .info (f"🗑️ Removed '{ path } ' from problems.json" )
271+
272+
273+ def main ():
274+ api_token , base_url , input_path = handle_input ()
275+ client = omegaup .api .Client (api_token = api_token , url = base_url )
276+
277+ if not os .path .exists (input_path ):
278+ LOG .error (f"❌ JSON file not found: { input_path } " )
279+ return
280+
281+ with open (input_path , "r" , encoding = "utf-8" ) as f :
282+ data = json .load (f )
283+
284+ problems_data = load_problems_json ()
285+ process_add (data , problems_data , client , base_url )
286+ process_remove (data , problems_data , client )
287+ save_problems_json (problems_data )
288+
289+ try :
290+ with open (input_path , "w" , encoding = "utf-8" ) as f :
291+ json .dump ({"add_problem" : [], "remove_problem" : []}, f , indent = 2 , ensure_ascii = False )
292+ LOG .info (f"🧹 Cleared 'add_problem' and 'remove_problem' arrays in { input_path } " )
293+ except (IOError , json .JSONDecodeError ) as e :
294+ LOG .error (f"❌ Failed to reset { input_path } : { e } " )
295+
296+
297+ if __name__ == "__main__" :
298+ main ()
0 commit comments