|
1 |
| -import sys |
2 |
| -import py_common.utils |
3 | 1 | import re
|
4 | 2 | import json
|
5 | 3 | import shutil
|
6 | 4 | import zipfile
|
7 | 5 | import fnmatch
|
8 |
| -import urllib3 |
9 | 6 | import requests
|
10 |
| -import unicodedata |
11 | 7 | import contextlib
|
12 | 8 | import urllib
|
13 |
| -from urllib.request import urlopen |
14 |
| -import imghdr |
15 | 9 | from PIL import Image
|
16 | 10 |
|
17 | 11 | import os
|
18 | 12 | from os import listdir
|
19 |
| -from os.path import isfile, join |
20 | 13 |
|
21 |
| -from bs4 import BeautifulSoup |
22 | 14 | from unidecode import unidecode
|
23 | 15 |
|
24 | 16 | from py_common.Logger import Logger
|
25 |
| -from py_common.Production import Production |
| 17 | +import py7zr |
26 | 18 |
|
27 | 19 | ###########################
|
28 | 20 | ### GLOBAL VAR AND CONS ###
|
@@ -115,139 +107,124 @@ def fetch_prod_name(prod, suffix, filepath):
|
115 | 107 | return path
|
116 | 108 |
|
117 | 109 |
|
118 |
| -def build(prod: Production, entrypath: str, desired_extentions: list): |
| 110 | + |
| 111 | +def build(prod, entrypath: str, desired_extensions: list): |
119 | 112 | '''
|
120 |
| - given a prod "Production" object containing |
121 |
| - all production's data, create a proper named folder, fetches all files (screenshot + rom) |
122 |
| - and properly organize everything |
| 113 | + Given a prod "Production" object containing |
| 114 | + all production's data, create a properly named folder, fetch all files (screenshot + ROM), |
| 115 | + and organize everything. |
123 | 116 | '''
|
124 |
| - if not os.path.exists(entrypath + prod.slug): |
125 |
| - ############# |
126 |
| - # PROD FILE # |
127 |
| - ############# |
128 |
| - # make its own folder |
129 |
| - os.mkdir(entrypath + prod.slug, 0o777) |
130 |
| - |
131 |
| - # figuring out the suffix |
132 |
| - suffix = str.lower(prod.url.split(".")[-1]) |
133 |
| - if suffix not in desired_extentions: |
134 |
| - suffix = "gb" |
135 |
| - |
136 |
| - # building the filepath |
137 |
| - filepath = entrypath + prod.slug + "/" |
138 |
| - |
139 |
| - # download the file |
140 |
| - # in case of http |
141 |
| - if prod.url.startswith("http"): |
142 |
| - try: |
143 |
| - r = requests.get(prod.url, allow_redirects=True, |
144 |
| - timeout=None, verify=False, headers=headers) |
145 |
| - if r.status_code != 200: |
146 |
| - logger.write("[ERR]:", str(r.status_code) + |
147 |
| - ": " + prod.slug + " - " + prod.url) |
148 |
| - |
149 |
| - # cleaning in case of error |
150 |
| - shutil.rmtree(entrypath + prod.slug) |
151 |
| - return 1 |
152 |
| - except ConnectionError as e: |
153 |
| - logger.write("[ERR]:", str(r.status_code) + |
154 |
| - ": " + prod.slug + " - " + prod.url) |
155 |
| - logger.write("[ERR]:", "REASON: " + e) |
156 |
| - |
157 |
| - # cleaning in case of error |
158 |
| - shutil.rmtree(entrypath + prod.slug) |
159 |
| - return 1 |
160 |
| - open(filepath + prod.slug + "." + suffix, 'wb').write(r.content) |
161 |
| - else: |
162 |
| - with contextlib.closing(urllib.request.urlopen(prod.url)) as r: |
163 |
| - with open(filepath + prod.slug + "." + suffix, 'wb') as f: |
164 |
| - shutil.copyfileobj(r, f) |
165 |
| - |
166 |
| - # unzip in case of zip |
167 |
| - if prod.url.endswith(".zip") or prod.url.endswith(".ZIP"): |
168 |
| - # download and unzip |
169 |
| - try: |
170 |
| - with zipfile.ZipFile(filepath + prod.slug + "." + suffix, "r") as zip_ref: |
171 |
| - zip_ref.extractall(filepath + "unzippedfolder") |
| 117 | + # Create folder if not already present |
| 118 | + target_folder = os.path.join(entrypath, prod.slug) |
| 119 | + if not os.path.exists(target_folder): |
| 120 | + os.mkdir(target_folder, 0o777) |
172 | 121 |
|
173 |
| - # manage all extensions, and it doesn't matter if they have uppercase or lowercase |
174 |
| - path = [] # eventually the file |
| 122 | + # Extract file extension |
| 123 | + suffix = prod.url.split(".")[-1].lower() |
| 124 | + |
| 125 | + if suffix not in desired_extensions and suffix not in ["zip", "7z", "mp4"]: |
| 126 | + print(f"ERROR: {prod.slug} extension is not in {desired_extensions}") |
| 127 | + suffix = "gb" # Fallback extension |
175 | 128 |
|
176 |
| - extentions = fix_extentions(desired_extentions) |
177 |
| - for extension in extentions: |
178 |
| - path = fetch_prod_name(prod, extension, filepath) |
179 |
| - if path != []: |
180 |
| - break |
| 129 | + # Build the file path |
| 130 | + filepath = os.path.join(target_folder, f"{prod.slug}.{suffix}") |
181 | 131 |
|
182 |
| - # proper renaming and moving the file |
183 |
| - if path != []: |
184 |
| - os.rename(path[0], filepath + prod.slug + |
185 |
| - "." + extension.lower()) |
| 132 | + # Download the file |
| 133 | + try: |
| 134 | + if prod.url.startswith("http"): |
| 135 | + r = requests.get(prod.url, allow_redirects=True, timeout=None, verify=False) |
| 136 | + if r.status_code != 200: |
| 137 | + raise Exception(f"HTTP Error {r.status_code}") |
| 138 | + with open(filepath, 'wb') as f: |
| 139 | + f.write(r.content) |
| 140 | + else: |
| 141 | + with contextlib.closing(urllib.request.urlopen(prod.url)) as r: |
| 142 | + with open(filepath, 'wb') as f: |
| 143 | + shutil.copyfileobj(r, f) |
| 144 | + except Exception as e: |
| 145 | + logger.write("[ERR]:", f"Error downloading {prod.slug}: {e}") |
| 146 | + shutil.rmtree(target_folder) |
| 147 | + return 1 |
| 148 | + |
| 149 | + # Unzip and handle files |
| 150 | + if suffix in ["zip", "7z"]: |
| 151 | + unzipped_path = os.path.join(target_folder, "unzippedfolder") |
| 152 | + os.makedirs(unzipped_path, exist_ok=True) |
186 | 153 |
|
187 |
| - # update production object file |
188 |
| - prod.files.append(prod.slug + "." + extension.lower()) |
189 |
| - else: |
190 |
| - logger.write( |
191 |
| - "[WARN]", prod.title + " extension is not a " + prod.platform + " file.") |
192 |
| - shutil.rmtree(entrypath + prod.slug) |
193 |
| - return 1 |
194 |
| - |
195 |
| - # cleaning up unneeded files |
196 |
| - shutil.rmtree(filepath + "unzippedfolder") |
197 |
| - if CLEANZIP: |
198 |
| - os.remove(filepath + prod.slug + "." + "zip") |
199 |
| - except zipfile.BadZipFile as e: |
200 |
| - logger.write("[ERR] ", str(e) + " bad zip file") |
201 |
| - shutil.rmtree(entrypath + prod.slug) |
| 154 | + try: |
| 155 | + if suffix == "zip": |
| 156 | + with zipfile.ZipFile(filepath, "r") as zip_ref: |
| 157 | + zip_ref.extractall(unzipped_path) |
| 158 | + elif suffix == "7z": |
| 159 | + with py7zr.SevenZipFile(filepath, mode='r') as z: |
| 160 | + z.extractall(unzipped_path) |
| 161 | + except Exception as e: |
| 162 | + logger.write("[ERR]:", f"Failed to extract {suffix} file: {e}") |
| 163 | + shutil.rmtree(target_folder) |
202 | 164 | return 1
|
203 |
| - else: |
204 |
| - # it is a proper gb file -> just write the filename in its own structure field |
205 |
| - pass |
206 |
| - |
207 |
| - # download the screenshot |
208 |
| - if prod.screenshots != None and prod.screenshots != [] and prod.screenshots[0] != "None": |
209 |
| - r = requests.get( |
210 |
| - prod.screenshots[0], allow_redirects=True, timeout=None) |
211 |
| - |
212 |
| - # figuring out what kind of screenshots I am dealing with |
213 |
| - screen_file_path = filepath + prod.slug + "." |
214 |
| - |
215 |
| - # screenshot fileext |
216 |
| - screen_ext = prod.screenshots[0].split(".")[-1] |
217 |
| - logger.write("[INFO]", " The screenshot is in " + |
218 |
| - screen_ext + " format") |
219 | 165 |
|
220 |
| - if screen_ext.lower() == "png": |
221 |
| - screen_file_path += "png" |
222 |
| - else: |
223 |
| - screen_file_path += screen_ext |
224 |
| - |
225 |
| - open(screen_file_path, 'wb').write(r.content) |
| 166 | + # Search for desired extensions in the extracted folder |
| 167 | + valid_file_found = False |
| 168 | + |
| 169 | + # Recursively search all files under the unzipped path |
| 170 | + for root, _, files in os.walk(unzipped_path): |
| 171 | + for file in files: |
| 172 | + ext = file.split(".")[-1].lower() |
| 173 | + if ext in desired_extensions: |
| 174 | + extracted_file = os.path.join(root, file) |
| 175 | + final_file = os.path.join(target_folder, f"{prod.slug}.{ext}") |
| 176 | + |
| 177 | + # Move the valid file to the target folder |
| 178 | + shutil.move(extracted_file, final_file) |
| 179 | + prod.files.append(f"{prod.slug}.{ext}") |
| 180 | + |
| 181 | + valid_file_found = True |
| 182 | + break |
| 183 | + |
| 184 | + if valid_file_found: |
| 185 | + break |
226 | 186 |
|
227 |
| - if screen_ext != "png": |
228 |
| - im = Image.open(screen_file_path).convert("RGB") |
229 |
| - im.save(filepath + prod.slug + ".png", "png") |
| 187 | + if not valid_file_found: |
| 188 | + logger.write("[WARN]:", f"No valid files with extensions {desired_extensions} found.") |
| 189 | + shutil.rmtree(target_folder) |
| 190 | + return 1 |
230 | 191 |
|
231 |
| - logger.write( |
232 |
| - "[INFO]", " Screenshot has been converted into a PNG file.") |
233 |
| - logger.write("[INFO]", " Removing screenshot " + |
234 |
| - screen_ext + " file...") |
| 192 | + # Clean up unzipped files and original archive |
| 193 | + shutil.rmtree(unzipped_path) |
| 194 | + if CLEANZIP: |
| 195 | + os.remove(filepath) |
| 196 | + else: |
| 197 | + prod.files.append(f"{prod.slug}.{suffix}") |
235 | 198 |
|
236 |
| - os.remove(screen_file_path) |
| 199 | + # Handle screenshots |
| 200 | + if prod.screenshots and prod.screenshots[0] != "None": |
| 201 | + try: |
| 202 | + r = requests.get(prod.screenshots[0], allow_redirects=True, timeout=None) |
| 203 | + screen_ext = prod.screenshots[0].split(".")[-1].lower() |
| 204 | + screen_file = os.path.join(target_folder, f"{prod.slug}.{screen_ext}") |
| 205 | + with open(screen_file, 'wb') as f: |
| 206 | + f.write(r.content) |
| 207 | + |
| 208 | + # Convert to PNG if necessary |
| 209 | + if screen_ext != "png": |
| 210 | + img = Image.open(screen_file).convert("RGB") |
| 211 | + png_file = os.path.join(target_folder, f"{prod.slug}.png") |
| 212 | + img.save(png_file, "PNG") |
| 213 | + os.remove(screen_file) |
| 214 | + prod.screenshots[0] = f"{prod.slug}.png" |
| 215 | + else: |
| 216 | + prod.screenshots[0] = f"{prod.slug}.png" |
| 217 | + except Exception as e: |
| 218 | + logger.write("[ERR]:", f"Failed to download screenshot for {prod.slug}: {e}") |
| 219 | + prod.screenshots = [] |
237 | 220 |
|
238 |
| - open(filepath + prod.slug + "." + "png", 'wb').write(r.content) |
239 |
| - prod.screenshots[0] = prod.slug + "." + "png" |
240 |
| - else: |
241 |
| - prod.screenshots = [] |
242 |
| - logger.write( |
243 |
| - "[INFO]", "Screenshot not present for this production") |
244 | 221 | else:
|
245 |
| - logger.write( |
246 |
| - "[WARN]", "directory already present. Skipping " + prod.slug + "...") |
| 222 | + logger.write("[WARN]:", f"Directory already exists for {prod.slug}. Skipping...") |
247 | 223 | return 1
|
248 | 224 | return 0
|
249 | 225 |
|
250 | 226 |
|
| 227 | + |
251 | 228 | def fix_extentions(desired_extentions):
|
252 | 229 | '''
|
253 | 230 | given a theorical list of extensions, it returns a list containing additional correct extensions (like CGB, AGB)
|
|
0 commit comments