Skip to content

Commit 05d3d4b

Browse files
authored
Stream write of all objects (#1387)
* Every object export is sent to write queue * Write thread for everything * Don't reallocate queue between objects export * Remove deprecated setter on Thread * Pass write Q as parameter of export * Reduce log level * Fix double import
1 parent 2e57169 commit 05d3d4b

File tree

10 files changed

+135
-88
lines changed

10 files changed

+135
-88
lines changed

migration/migration.py

Lines changed: 46 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -98,50 +98,44 @@ def __parse_args(desc):
9898
return args
9999

100100

101-
def __remove_chars_at_end(file: str, nb_bytes: int) -> None:
102-
"""Writes the configuration in file"""
103-
with open(file, mode="rb+") as fd:
104-
fd.seek(-nb_bytes, os.SEEK_END)
105-
fd.truncate()
106-
107-
108-
def write_projects(queue: Queue, file: str) -> None:
101+
def write_objects(queue: Queue, fd, object_type: str) -> None:
109102
"""
110103
Thread to write projects in the JSON file
111104
"""
112105
done = False
113106
prefix = ""
114-
with utilities.open_file(file, mode="a") as fd:
115-
print('" projects": {', file=fd)
116-
while not done:
117-
project_json = queue.get()
118-
done = project_json is None
119-
if not done:
120-
log.info("Writing project '%s'", project_json["key"])
121-
key = project_json.pop("key")
122-
print(f'{prefix}"{key}": {utilities.json_dump(project_json)}', end="", file=fd)
123-
prefix = ",\n"
124-
queue.task_done()
125-
print("\n}", file=fd, end="")
126-
log.info("Writing projects complete")
127-
128-
129-
def __write_export(config: dict[str, str], file: str) -> None:
130-
"""Writes the configuration in file"""
131-
with utilities.open_file(file) as fd:
132-
print(utilities.json_dump(config), file=fd)
107+
log.info("Waiting %s to write...", object_type)
108+
print(f'"{object_type}": ' + "{", file=fd)
109+
while not done:
110+
obj_json = queue.get()
111+
done = obj_json is None
112+
if not done:
113+
if object_type in ("projects", "applications", "portfolios", "users"):
114+
if object_type == "users":
115+
key = obj_json.pop("login", None)
116+
else:
117+
key = obj_json.pop("key", None)
118+
log.debug("Writing %s key '%s'", object_type[:-1], key)
119+
print(f'{prefix}"{key}": {utilities.json_dump(obj_json)}', end="", file=fd)
120+
else:
121+
log.debug("Writing %s", object_type)
122+
print(f"{prefix}{utilities.json_dump(obj_json)[2:-1]}", end="", file=fd)
123+
prefix = ",\n"
124+
queue.task_done()
125+
print("\n}", file=fd, end="")
126+
log.info("Writing %s complete", object_type)
133127

134128

135129
def __export_config(endpoint: platform.Platform, what: list[str], **kwargs) -> None:
136130
"""Exports a platform configuration in a JSON file"""
131+
file = kwargs[options.REPORT_FILE]
137132
export_settings = {
138133
"INLINE_LISTS": False,
139134
"EXPORT_DEFAULTS": True,
140135
# "FULL_EXPORT": kwargs["fullExport"],
141136
"FULL_EXPORT": False,
142137
"MODE": "MIGRATION",
143138
"THREADS": kwargs[options.NBR_THREADS],
144-
options.REPORT_FILE: kwargs[options.REPORT_FILE],
145139
"SKIP_ISSUES": kwargs["skipIssues"],
146140
}
147141
if "projects" in what and kwargs[options.KEYS]:
@@ -154,7 +148,7 @@ def __export_config(endpoint: platform.Platform, what: list[str], **kwargs) -> N
154148
options.WHAT_RULES: [__JSON_KEY_RULES, rules.export],
155149
options.WHAT_PROFILES: [__JSON_KEY_PROFILES, qualityprofiles.export],
156150
options.WHAT_GATES: [__JSON_KEY_GATES, qualitygates.export],
157-
# options.WHAT_PROJECTS: [__JSON_KEY_PROJECTS, projects.export],
151+
options.WHAT_PROJECTS: [__JSON_KEY_PROJECTS, projects.export],
158152
options.WHAT_APPS: [__JSON_KEY_APPS, applications.export],
159153
options.WHAT_PORTFOLIOS: [__JSON_KEY_PORTFOLIOS, portfolios.export],
160154
options.WHAT_USERS: [__JSON_KEY_USERS, users.export],
@@ -164,34 +158,31 @@ def __export_config(endpoint: platform.Platform, what: list[str], **kwargs) -> N
164158
log.info("Exporting configuration from %s", kwargs[options.URL])
165159
key_list = kwargs[options.KEYS]
166160
sq_settings = {__JSON_KEY_PLATFORM: endpoint.basics()}
167-
for what_item, call_data in calls.items():
168-
if what_item not in what:
169-
continue
170-
ndx, func = call_data
171-
try:
172-
sq_settings[ndx] = func(endpoint, export_settings=export_settings, key_list=key_list)
173-
__write_export(sq_settings, kwargs[options.REPORT_FILE])
174-
except exceptions.UnsupportedOperation as e:
175-
log.warning(e.message)
176-
sq_settings = utilities.remove_empties(sq_settings)
177-
# if not kwargs.get("dontInlineLists", False):
178-
# sq_settings = utilities.inline_lists(sq_settings, exceptions=("conditions",))
179-
180-
log.info("Exporting project migration data streaming projects in '%s'", kwargs[options.REPORT_FILE])
181-
__remove_chars_at_end(kwargs[options.REPORT_FILE], 3)
182-
with utilities.open_file(kwargs[options.REPORT_FILE], mode="a") as fd:
183-
print(",", file=fd)
161+
is_first = True
184162
q = Queue(maxsize=0)
185-
worker = Thread(target=write_projects, args=(q, kwargs[options.REPORT_FILE]))
186-
worker.setDaemon(True)
187-
worker.setName("WriteThread")
188-
worker.start()
189-
export_settings["WRITE_QUEUE"] = q
190-
projects.export(endpoint, export_settings=export_settings, key_list=key_list)
191-
q.join()
192-
log.info("Exporting migration data from %s completed", kwargs["url"])
193-
with utilities.open_file(kwargs[options.REPORT_FILE], mode="a") as fd:
163+
with utilities.open_file(file, mode="w") as fd:
164+
print("{", file=fd)
165+
for what_item, call_data in calls.items():
166+
if what_item not in what:
167+
continue
168+
ndx, func = call_data
169+
try:
170+
if not is_first:
171+
print(",", file=fd)
172+
is_first = False
173+
worker = Thread(target=write_objects, args=(q, fd, ndx))
174+
worker.daemon = True
175+
worker.name = f"Write{ndx[:1].upper()}{ndx[1:10]}"
176+
worker.start()
177+
sq_settings[ndx] = func(endpoint, export_settings=export_settings, key_list=key_list, write_q=q)
178+
q.join()
179+
except exceptions.UnsupportedOperation as e:
180+
log.warning(e.message)
181+
sq_settings = utilities.remove_empties(sq_settings)
182+
# if not kwargs.get("dontInlineLists", False):
183+
# sq_settings = utilities.inline_lists(sq_settings, exceptions=("conditions",))
194184
print("\n}", file=fd)
185+
log.info("Exporting migration data from %s completed", kwargs["url"])
195186

196187

197188
def main() -> None:

sonar/applications.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#
2020

2121
from __future__ import annotations
22+
from queue import Queue
2223
from typing import Union
2324

2425
import json
@@ -498,7 +499,9 @@ def exists(endpoint: pf.Platform, key: str) -> bool:
498499
return False
499500

500501

501-
def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None) -> types.ObjectJsonRepr:
502+
def export(
503+
endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None, write_q: Queue = None
504+
) -> types.ObjectJsonRepr:
502505
"""Exports applications as JSON
503506
504507
:param Platform endpoint: Reference to the Sonar platform
@@ -511,11 +514,17 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_lis
511514
# log.info("Applications do not exist in SonarCloud, export skipped")
512515
raise exceptions.UnsupportedOperation("Applications do not exist in SonarCloud, export skipped")
513516

514-
apps_settings = {k: app.export(export_settings) for k, app in get_list(endpoint, key_list).items()}
515-
for k in apps_settings:
516-
# remove key from JSON value, it's already the dict key
517-
apps_settings[k].pop("key")
518-
return dict(sorted(apps_settings.items()))
517+
apps_settings = {}
518+
for k, app in sorted(get_list(endpoint, key_list).items()):
519+
app_json = app.export(export_settings)
520+
if write_q:
521+
write_q.put(app_json)
522+
else:
523+
app_json.pop("key")
524+
apps_settings[k] = app_json
525+
if write_q:
526+
write_q.put(None)
527+
return apps_settings
519528

520529

521530
def audit(endpoint: pf.Platform, audit_settings: types.ConfigSettings, key_list: types.KeyList = None) -> list[problem.Problem]:

sonar/groups.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#
2020

2121
from __future__ import annotations
22+
from queue import Queue
2223
from typing import Optional
2324
import sonar.logging as log
2425
import sonar.platform as pf
@@ -256,7 +257,9 @@ def get_list(endpoint: pf.Platform) -> dict[str, Group]:
256257
return search(endpoint)
257258

258259

259-
def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None) -> types.ObjectJsonRepr:
260+
def export(
261+
endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: Optional[types.KeyList] = None, write_q: Optional[Queue] = None
262+
) -> types.ObjectJsonRepr:
260263
"""Exports groups representation in JSON
261264
262265
:param Platform endpoint: reference to the SonarQube platform
@@ -272,6 +275,9 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_lis
272275
if not export_settings["FULL_EXPORT"] and g_obj.is_default():
273276
continue
274277
g_list[g_name] = "" if g_obj.description is None else g_obj.description
278+
if write_q:
279+
write_q.put(g_list)
280+
write_q.put(None)
275281
return g_list
276282

277283

sonar/platform.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from http import HTTPStatus
2828
import sys
2929
import os
30+
from queue import Queue
3031
from typing import Optional
3132
import time
3233
import datetime
@@ -869,7 +870,9 @@ def convert_for_yaml(original_json: types.ObjectJsonRepr) -> types.ObjectJsonRep
869870
return original_json
870871

871872

872-
def export(endpoint: Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None) -> types.ObjectJsonRepr:
873+
def export(
874+
endpoint: Platform, export_settings: types.ConfigSettings, key_list: Optional[types.KeyList] = None, write_q: Optional[Queue] = None
875+
) -> types.ObjectJsonRepr:
873876
"""Exports all or a list of projects configuration as dict
874877
875878
:param Platform endpoint: reference to the SonarQube platform
@@ -878,4 +881,8 @@ def export(endpoint: Platform, export_settings: types.ConfigSettings, key_list:
878881
:return: Platform settings
879882
:rtype: ObjectJsonRepr
880883
"""
881-
return endpoint.export(export_settings)
884+
exp = endpoint.export(export_settings)
885+
if write_q:
886+
write_q.put(exp)
887+
write_q.put(None)
888+
return exp

sonar/portfolios.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
"""
2525

2626
from __future__ import annotations
27+
from queue import Queue
2728
from typing import Union, Optional
28-
import time
2929
import json
3030
import datetime
3131
from http import HTTPStatus
@@ -728,7 +728,9 @@ def search_by_key(endpoint: pf.Platform, key: str) -> types.ApiPayload:
728728
return util.search_by_key(endpoint, key, Portfolio.SEARCH_API, "components")
729729

730730

731-
def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None) -> types.ObjectJsonRepr:
731+
def export(
732+
endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: Optional[types.KeyList] = None, write_q: Optional[Queue] = None
733+
) -> types.ObjectJsonRepr:
732734
"""Exports portfolios as JSON
733735
734736
:param Platform endpoint: Reference to the SonarQube platform
@@ -749,8 +751,12 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_lis
749751
for k, p in sorted(get_list(endpoint=endpoint, key_list=key_list).items()):
750752
try:
751753
if not p.is_sub_portfolio:
752-
exported_portfolios[k] = p.export(export_settings)
753-
exported_portfolios[k].pop("key")
754+
exp = p.export(export_settings)
755+
if write_q:
756+
write_q.put(exp)
757+
else:
758+
exp.pop("key")
759+
exported_portfolios[k] = exp
754760
else:
755761
log.debug("Skipping export of %s, it's a standard sub-portfolio", str(p))
756762
except HTTPError as e:
@@ -760,6 +766,8 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_lis
760766
i += 1
761767
if i % 10 == 0 or i == nb_portfolios:
762768
log.info("Exported %d/%d portfolios (%d%%)", i, nb_portfolios, (i * 100) // nb_portfolios)
769+
if write_q:
770+
write_q.put(None)
763771
return exported_portfolios
764772

765773

sonar/projects.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ def export(self, export_settings: types.ConfigSettings, settings_list: dict[str,
10591059
except Exception as e:
10601060
log.critical("Connecting error %s while exporting %s, export of this project interrupted", str(self), str(e))
10611061
json_data["error"] = f"Exception {str(e)} while exporting project, export interrupted"
1062-
log.info("Exporting %s done", str(self))
1062+
log.debug("Exporting %s done", str(self))
10631063
return util.remove_nones(json_data)
10641064

10651065
def new_code(self) -> str:
@@ -1473,13 +1473,13 @@ def audit(endpoint: pf.Platform, audit_settings: types.ConfigSettings, key_list:
14731473
return problems
14741474

14751475

1476-
def __export_thread(queue: Queue[Project], results: dict[str, str], export_settings: types.ConfigSettings) -> None:
1476+
def __export_thread(queue: Queue[Project], results: dict[str, str], export_settings: types.ConfigSettings, write_q: Optional[Queue] = None) -> None:
14771477
"""Project export callback function for multitheaded export"""
14781478
while not queue.empty():
14791479
project = queue.get()
14801480
exp_json = project.export(export_settings=export_settings)
1481-
if export_settings.get("WRITE_QUEUE", None):
1482-
export_settings["WRITE_QUEUE"].put(exp_json)
1481+
if write_q:
1482+
write_q.put(exp_json)
14831483
else:
14841484
results[project.key] = exp_json
14851485
results[project.key].pop("key", None)
@@ -1489,10 +1489,11 @@ def __export_thread(queue: Queue[Project], results: dict[str, str], export_setti
14891489
if nb % 10 == 0 or nb == tot:
14901490
log.info("%d/%d projects exported (%d%%)", nb, tot, (nb * 100) // tot)
14911491
queue.task_done()
1492-
log.info("Putting DONE in queue %s", str(export_settings["WRITE_QUEUE"]))
14931492

14941493

1495-
def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None) -> types.ObjectJsonRepr:
1494+
def export(
1495+
endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: Optional[types.KeyList] = None, write_q: Optional[Queue] = None
1496+
) -> types.ObjectJsonRepr:
14961497
"""Exports all or a list of projects configuration as dict
14971498
14981499
:param Platform endpoint: reference to the SonarQube platform
@@ -1514,12 +1515,13 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_lis
15141515
project_settings = {}
15151516
for i in range(export_settings.get("THREADS", 8)):
15161517
log.debug("Starting project export thread %d", i)
1517-
worker = Thread(target=__export_thread, args=(q, project_settings, export_settings))
1518-
worker.setDaemon(True)
1519-
worker.setName(f"ProjectExport{i}")
1518+
worker = Thread(target=__export_thread, args=(q, project_settings, export_settings, write_q))
1519+
worker.daemon = True
1520+
worker.name = f"ProjectExport{i}"
15201521
worker.start()
15211522
q.join()
1522-
export_settings["WRITE_QUEUE"].put(None)
1523+
if write_q:
1524+
write_q.put(None)
15231525
return dict(sorted(project_settings.items()))
15241526

15251527

sonar/qualitygates.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
"""
2525

2626
from __future__ import annotations
27-
28-
from typing import Union
27+
from queue import Queue
28+
from typing import Union, Optional
2929

3030
from http import HTTPStatus
3131
import json
@@ -380,7 +380,9 @@ def get_list(endpoint: pf.Platform) -> dict[str, QualityGate]:
380380
return qg_list
381381

382382

383-
def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None) -> types.ObjectJsonRepr:
383+
def export(
384+
endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: Optional[types.KeyList] = None, write_q: Optional[Queue] = None
385+
) -> types.ObjectJsonRepr:
384386
"""Exports quality gates as JSON
385387
386388
:param Platform endpoint: Reference to the Sonar platform
@@ -390,7 +392,11 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_lis
390392
:rtype: ObjectJsonRepr
391393
"""
392394
log.info("Exporting quality gates")
393-
return {k: qg.to_json(export_settings) for k, qg in sorted(get_list(endpoint).items())}
395+
qg_list = {k: qg.to_json(export_settings) for k, qg in sorted(get_list(endpoint).items())}
396+
if write_q:
397+
write_q.put(qg_list)
398+
write_q.put(None)
399+
return qg_list
394400

395401

396402
def import_config(endpoint: pf.Platform, config_data: types.ObjectJsonRepr, key_list: types.KeyList = None) -> bool:

0 commit comments

Comments
 (0)