|
2 | 2 | import datetime |
3 | 3 | import os |
4 | 4 | import re |
5 | | -import shutil |
6 | | -from collections.abc import ValuesView |
7 | | -from os.path import dirname, isfile |
8 | 5 | from pprint import pprint |
9 | | -from typing import List, Optional, Sequence, Tuple |
| 6 | +from typing import Sequence |
10 | 7 |
|
11 | 8 | REQUIREMENT_FILES = [ |
12 | 9 | "requirements/base.txt", |
13 | 10 | "requirements/extra.txt", |
14 | 11 | "requirements/examples.txt", |
15 | 12 | ] |
16 | 13 |
|
17 | | - |
18 | | -def _retrieve_files(directory: str, *ext: str) -> List[str]: |
19 | | - all_files = [] |
20 | | - for root, _, files in os.walk(directory): |
21 | | - for fname in files: |
22 | | - if not ext or any(os.path.split(fname)[1].lower().endswith(e) for e in ext): |
23 | | - all_files.append(os.path.join(root, fname)) |
24 | | - |
25 | | - return all_files |
26 | | - |
27 | | - |
28 | | -def _replace_standalone_imports(lines: List[str], mapping: List[Tuple[str, str]], lightning_by: str = "") -> List[str]: |
29 | | - """Replace imports of standalone package to lightning. |
30 | | -
|
31 | | - Adapted from `assistant._replace_imports` |
32 | | - """ |
33 | | - out = lines[:] |
34 | | - for source_import, target_import in mapping: |
35 | | - for i, ln in enumerate(out): |
36 | | - out[i] = re.sub( |
37 | | - rf"([^_/@]|^){source_import}([^_\w/]|$)", |
38 | | - rf"\1{target_import}\2", |
39 | | - ln, |
40 | | - ) |
41 | | - if lightning_by: # in addition, replace base package |
42 | | - out[i] = out[i].replace("from lightning import ", f"from {lightning_by} import ") |
43 | | - out[i] = out[i].replace("import lightning ", f"import {lightning_by} ") |
44 | | - return out |
45 | | - |
46 | | - |
47 | | -def _replace_unified_imports(lines: List[str], mapping: List[Tuple[str, str]], lightning_by: str = "") -> List[str]: |
48 | | - """Replace imports of standalone package to unified lightning. |
49 | | -
|
50 | | - Adapted from `assistant._replace_imports` |
51 | | - """ |
52 | | - out = lines[:] |
53 | | - for source_import, target_import in mapping: |
54 | | - for i, ln in enumerate(out): |
55 | | - out[i] = re.sub( |
56 | | - rf"([^_/@]|^){source_import}([^_\w/]|$)", |
57 | | - rf"\1{target_import}\2", |
58 | | - ln, |
59 | | - ) |
60 | | - if lightning_by: # in addition, replace base package |
61 | | - out[i] = out[i].replace("from lightning import ", f"from {lightning_by} import ") |
62 | | - out[i] = out[i].replace("import lightning ", f"import {lightning_by} ") |
63 | | - return out |
64 | | - |
65 | | - |
66 | | -def copy_replace_imports( |
67 | | - source_dir: str, |
68 | | - source_imports: Sequence[str], |
69 | | - target_imports: Sequence[str], |
70 | | - target_dir: Optional[str] = None, |
71 | | - lightning_by: str = "", |
72 | | -) -> None: |
73 | | - """Replace package content with import adjustments. |
74 | | -
|
75 | | - Adapted from `assistant.copy_replace_imports` |
76 | | - """ |
77 | | - print(f"Replacing imports: {locals()}") |
78 | | - assert len(source_imports) == len(target_imports), ( |
79 | | - "source and target imports must have the same length, " |
80 | | - f"source: {len(source_imports)}, target: {len(target_imports)}" |
81 | | - ) |
82 | | - if target_dir is None: |
83 | | - target_dir = source_dir |
84 | | - |
85 | | - ls = _retrieve_files(source_dir) |
86 | | - for fp in ls: |
87 | | - fp_new = fp.replace(source_dir, target_dir) |
88 | | - _, ext = os.path.splitext(fp) |
89 | | - if ext in (".png", ".jpg", ".ico"): |
90 | | - os.makedirs(dirname(fp_new), exist_ok=True) |
91 | | - if not isfile(fp_new): |
92 | | - shutil.copy(fp, fp_new) |
93 | | - continue |
94 | | - elif ext in (".pyc",): |
95 | | - continue |
96 | | - # Try to parse everything else |
97 | | - with open(fp, encoding="utf-8") as fo: |
98 | | - try: |
99 | | - lines = fo.readlines() |
100 | | - except UnicodeDecodeError: |
101 | | - # a binary file, skip |
102 | | - print(f"Skipped replacing imports for {fp}") |
103 | | - continue |
104 | | - lines = _replace_standalone_imports(lines, list(zip(source_imports, target_imports)), lightning_by=lightning_by) |
105 | | - os.makedirs(os.path.dirname(fp_new), exist_ok=True) |
106 | | - with open(fp_new, "w", encoding="utf-8") as fo: |
107 | | - fo.writelines(lines) |
108 | | - |
109 | | - |
110 | | -def use_standalone_pl(mapping, src_dirs: ValuesView) -> None: |
111 | | - for in_place_path in src_dirs: |
112 | | - for _, _ in mapping.items(): |
113 | | - copy_replace_imports( |
114 | | - source_dir=str(in_place_path), |
115 | | - source_imports=mapping.keys(), |
116 | | - target_imports=mapping.values(), |
117 | | - target_dir=str(in_place_path), |
118 | | - ) |
119 | | - |
120 | | - |
121 | 14 | class AssistantCLI: |
122 | | - |
123 | 15 | _PATH_ROOT = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) |
124 | 16 |
|
125 | 17 | @staticmethod |
@@ -171,24 +63,6 @@ def prepare_nightly_version(proj_root: str = _PATH_ROOT) -> None: |
171 | 63 | with open(path_info, "w") as fp: |
172 | 64 | fp.write(init) |
173 | 65 |
|
174 | | - @staticmethod |
175 | | - def copy_replace_imports( |
176 | | - source_dir: str, |
177 | | - source_import: str, |
178 | | - target_import: str, |
179 | | - target_dir: Optional[str] = None, |
180 | | - lightning_by: str = "", |
181 | | - ) -> None: |
182 | | - """Copy package content in-place with import adjustments. |
183 | | -
|
184 | | - Adapated from Lightning version of `assistant`. |
185 | | - """ |
186 | | - source_imports = source_import.strip().split(",") |
187 | | - target_imports = target_import.strip().split(",") |
188 | | - copy_replace_imports( |
189 | | - source_dir, source_imports, target_imports, target_dir=target_dir, lightning_by=lightning_by |
190 | | - ) |
191 | | - |
192 | 66 |
|
193 | 67 | if __name__ == "__main__": |
194 | 68 | import jsonargparse |
|
0 commit comments