1
1
import dataclasses
2
2
import itertools
3
+ from collections import defaultdict
3
4
from collections .abc import Callable
4
5
from concurrent .futures import ThreadPoolExecutor
5
6
from functools import partial
9
10
from shared .bundle_analysis import StoragePaths
10
11
from shared .django_apps .compare .models import CommitComparison
11
12
from shared .django_apps .core .models import Commit , Pull
13
+ from shared .django_apps .profiling .models import ProfilingUpload
12
14
from shared .django_apps .reports .models import CommitReport , ReportDetails
13
15
from shared .django_apps .reports .models import ReportSession as Upload
16
+ from shared .django_apps .staticanalysis .models import StaticAnalysisSingleFileSnapshot
14
17
15
18
from services .archive import ArchiveService , MinioEndpoints
16
19
from services .cleanup .utils import CleanupContext , CleanupResult
19
22
DELETE_FILES_BATCHSIZE = 50
20
23
21
24
22
- def cleanup_files_batched (context : CleanupContext , paths : list [str ]) -> int :
25
+ def cleanup_files_batched (
26
+ context : CleanupContext , buckets_paths : dict [str , list [str ]]
27
+ ) -> int :
23
28
cleaned_files = 0
24
29
25
30
# TODO: maybe reuse the executor across calls?
26
31
with ThreadPoolExecutor () as e :
27
- for batched_paths in itertools . batched ( paths , DELETE_FILES_BATCHSIZE ):
28
- e . submit ( context . storage . delete_files , context . bucket , list ( batched_paths ))
29
-
30
- cleaned_files += len (batched_paths )
32
+ for bucket , paths in buckets_paths . items ( ):
33
+ for batched_paths in itertools . batched ( paths , DELETE_FILES_BATCHSIZE ):
34
+ e . submit ( context . storage . delete_files , bucket , list ( batched_paths ))
35
+ cleaned_files += len (paths )
31
36
32
37
return cleaned_files
33
38
@@ -54,7 +59,9 @@ def cleanup_with_storage_field(
54
59
if len (storage_paths ) == 0 :
55
60
break
56
61
57
- cleaned_files += cleanup_files_batched (context , storage_paths )
62
+ cleaned_files += cleanup_files_batched (
63
+ context , {context .default_bucket : storage_paths }
64
+ )
58
65
cleaned_models += query .filter (
59
66
id__in = storage_query [:MANUAL_QUERY_CHUNKSIZE ]
60
67
)._raw_delete (query .db )
@@ -98,7 +105,7 @@ def cleanup_commitreport(context: CleanupContext, query: QuerySet) -> CleanupRes
98
105
if len (reports ) == 0 :
99
106
break
100
107
101
- storage_paths : list [str ] = []
108
+ buckets_paths : dict [ str , list [str ]] = defaultdict ( list )
102
109
for (
103
110
report_type ,
104
111
report_code ,
@@ -118,36 +125,63 @@ def cleanup_commitreport(context: CleanupContext, query: QuerySet) -> CleanupRes
118
125
# depending on the `report_type`, we have:
119
126
# - a `chunks` file for coverage
120
127
# - a `bundle_report.sqlite` for BA
121
- match report_type :
122
- case "bundle_analysis" :
123
- path = StoragePaths .bundle_report .path (
124
- repo_key = repo_hash , report_key = external_id
125
- )
126
- # TODO: bundle analysis lives in a different bucket I believe?
127
- storage_paths .append (path )
128
- case "test_results" :
129
- # TODO:
130
- pass
131
- case _: # coverage
132
- chunks_file_name = (
133
- report_code if report_code is not None else "chunks"
134
- )
135
- path = MinioEndpoints .chunks .get_path (
136
- version = "v4" ,
137
- repo_hash = repo_hash ,
138
- commitid = commit_sha ,
139
- chunks_file_name = chunks_file_name ,
140
- )
141
- storage_paths .append (path )
142
-
143
- cleaned_files += cleanup_files_batched (context , storage_paths )
128
+ if report_type == "bundle_analysis" :
129
+ path = StoragePaths .bundle_report .path (
130
+ repo_key = repo_hash , report_key = external_id
131
+ )
132
+ buckets_paths [context .bundleanalysis_bucket ].append (path )
133
+ elif report_type == "test_results" :
134
+ # TA has cached rollups, but those are based on `Branch`
135
+ pass
136
+ else :
137
+ chunks_file_name = report_code if report_code is not None else "chunks"
138
+ path = MinioEndpoints .chunks .get_path (
139
+ version = "v4" ,
140
+ repo_hash = repo_hash ,
141
+ commitid = commit_sha ,
142
+ chunks_file_name = chunks_file_name ,
143
+ )
144
+ buckets_paths [context .default_bucket ].append (path )
145
+
146
+ cleaned_files += cleanup_files_batched (context , buckets_paths )
144
147
cleaned_models += query .filter (
145
148
id__in = query .order_by ("id" )[:MANUAL_QUERY_CHUNKSIZE ]
146
149
)._raw_delete (query .db )
147
150
148
151
return CleanupResult (cleaned_models , cleaned_files )
149
152
150
153
154
+ def cleanup_upload (context : CleanupContext , query : QuerySet ) -> CleanupResult :
155
+ cleaned_files = 0
156
+
157
+ # delete `None` `storage_path`s right away
158
+ cleaned_models = query .filter (storage_path__isnull = True )._raw_delete (query .db )
159
+
160
+ # delete all those files from storage, using chunks based on the `id` column
161
+ storage_query = query .filter (storage_path__isnull = False ).order_by ("id" )
162
+
163
+ while True :
164
+ uploads = storage_query .values_list ("report__report_type" , "storage_path" )[
165
+ :MANUAL_QUERY_CHUNKSIZE
166
+ ]
167
+ if len (uploads ) == 0 :
168
+ break
169
+
170
+ buckets_paths : dict [str , list [str ]] = defaultdict (list )
171
+ for report_type , storage_path in uploads :
172
+ if report_type == "bundle_analysis" :
173
+ buckets_paths [context .bundleanalysis_bucket ].append (storage_path )
174
+ else :
175
+ buckets_paths [context .default_bucket ].append (storage_path )
176
+
177
+ cleaned_files += cleanup_files_batched (context , buckets_paths )
178
+ cleaned_models += query .filter (
179
+ id__in = storage_query [:MANUAL_QUERY_CHUNKSIZE ]
180
+ )._raw_delete (query .db )
181
+
182
+ return CleanupResult (cleaned_models , cleaned_files )
183
+
184
+
151
185
# All the models that need custom python code for deletions so a bulk `DELETE` query does not work.
152
186
MANUAL_CLEANUP : dict [
153
187
type [Model ], Callable [[CleanupContext , QuerySet ], CleanupResult ]
@@ -156,9 +190,10 @@ def cleanup_commitreport(context: CleanupContext, query: QuerySet) -> CleanupRes
156
190
Pull : partial (cleanup_archivefield , "flare" ),
157
191
ReportDetails : partial (cleanup_archivefield , "files_array" ),
158
192
CommitReport : cleanup_commitreport ,
159
- Upload : partial ( cleanup_with_storage_field , "storage_path" ) ,
193
+ Upload : cleanup_upload ,
160
194
CommitComparison : partial (cleanup_with_storage_field , "report_storage_path" ),
161
- # TODO: figure out any other models which have files in storage that are not `ArchiveField`
162
- # TODO: TA is also storing files in GCS
163
- # TODO: profiling, label analysis and static analysis also needs porting to django
195
+ ProfilingUpload : partial (cleanup_with_storage_field , "raw_upload_location" ),
196
+ StaticAnalysisSingleFileSnapshot : partial (
197
+ cleanup_with_storage_field , "content_location"
198
+ ),
164
199
}
0 commit comments