Skip to content

Commit 2e57169

Browse files
authored
Writing queue for streaming (#1385)
* Create writing queue to write project extracts
1 parent 904ba5a commit 2e57169

File tree

3 files changed

+35
-25
lines changed

3 files changed

+35
-25
lines changed

migration/migration.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
"""
2424
import sys
2525
import os
26-
from threading import Lock
26+
from threading import Thread, Lock
27+
from queue import Queue
2728

2829
from cli import options
2930
from sonar import exceptions, errcodes, utilities, version
@@ -104,27 +105,25 @@ def __remove_chars_at_end(file: str, nb_bytes: int) -> None:
104105
fd.truncate()
105106

106107

107-
def __add_project_header(file: str) -> None:
108-
"""Writes the configuration in file"""
109-
with open(file, mode="a", encoding="utf-8") as fd:
110-
print(',\n "projects": {\n', file=fd)
111-
112-
113-
def __add_project_footer(file: str) -> None:
114-
"""Closes projects section"""
115-
__remove_chars_at_end(file, 2)
116-
with open(file, mode="a", encoding="utf-8") as fd:
117-
print("\n }\n}", file=fd)
118-
119-
120-
def write_project(project_json: dict[str, any], file: str) -> None:
108+
def write_projects(queue: Queue, file: str) -> None:
121109
"""
122-
writes a project JSON in a file
110+
Thread to write projects in the JSON file
123111
"""
124-
key = project_json.pop("key")
125-
with _WRITE_LOCK:
126-
with utilities.open_file(file, mode="a") as fd:
127-
print(f'"{key}": {utilities.json_dump(project_json)},', file=fd)
112+
done = False
113+
prefix = ""
114+
with utilities.open_file(file, mode="a") as fd:
115+
print('" projects": {', file=fd)
116+
while not done:
117+
project_json = queue.get()
118+
done = project_json is None
119+
if not done:
120+
log.info("Writing project '%s'", project_json["key"])
121+
key = project_json.pop("key")
122+
print(f'{prefix}"{key}": {utilities.json_dump(project_json)}', end="", file=fd)
123+
prefix = ",\n"
124+
queue.task_done()
125+
print("\n}", file=fd, end="")
126+
log.info("Writing projects complete")
128127

129128

130129
def __write_export(config: dict[str, str], file: str) -> None:
@@ -179,12 +178,20 @@ def __export_config(endpoint: platform.Platform, what: list[str], **kwargs) -> N
179178
# sq_settings = utilities.inline_lists(sq_settings, exceptions=("conditions",))
180179

181180
log.info("Exporting project migration data streaming projects in '%s'", kwargs[options.REPORT_FILE])
182-
export_settings["WRITE_CALLBACK"] = write_project
183181
__remove_chars_at_end(kwargs[options.REPORT_FILE], 3)
184-
__add_project_header(kwargs[options.REPORT_FILE])
182+
with utilities.open_file(kwargs[options.REPORT_FILE], mode="a") as fd:
183+
print(",", file=fd)
184+
q = Queue(maxsize=0)
185+
worker = Thread(target=write_projects, args=(q, kwargs[options.REPORT_FILE]))
186+
worker.setDaemon(True)
187+
worker.setName("WriteThread")
188+
worker.start()
189+
export_settings["WRITE_QUEUE"] = q
185190
projects.export(endpoint, export_settings=export_settings, key_list=key_list)
186-
__add_project_footer(kwargs[options.REPORT_FILE])
191+
q.join()
187192
log.info("Exporting migration data from %s completed", kwargs["url"])
193+
with utilities.open_file(kwargs[options.REPORT_FILE], mode="a") as fd:
194+
print("\n}", file=fd)
188195

189196

190197
def main() -> None:

sonar/projects.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,8 +1478,8 @@ def __export_thread(queue: Queue[Project], results: dict[str, str], export_setti
14781478
while not queue.empty():
14791479
project = queue.get()
14801480
exp_json = project.export(export_settings=export_settings)
1481-
if export_settings.get("WRITE_CALLBACK", None):
1482-
export_settings["WRITE_CALLBACK"](exp_json, export_settings["file"])
1481+
if export_settings.get("WRITE_QUEUE", None):
1482+
export_settings["WRITE_QUEUE"].put(exp_json)
14831483
else:
14841484
results[project.key] = exp_json
14851485
results[project.key].pop("key", None)
@@ -1489,6 +1489,7 @@ def __export_thread(queue: Queue[Project], results: dict[str, str], export_setti
14891489
if nb % 10 == 0 or nb == tot:
14901490
log.info("%d/%d projects exported (%d%%)", nb, tot, (nb * 100) // tot)
14911491
queue.task_done()
1492+
log.info("Putting DONE in queue %s", str(export_settings["WRITE_QUEUE"]))
14921493

14931494

14941495
def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_list: types.KeyList = None) -> types.ObjectJsonRepr:
@@ -1518,6 +1519,7 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, key_lis
15181519
worker.setName(f"ProjectExport{i}")
15191520
worker.start()
15201521
q.join()
1522+
export_settings["WRITE_QUEUE"].put(None)
15211523
return dict(sorted(project_settings.items()))
15221524

15231525

test/test_sonarcloud.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
MY_ORG_1 = "okorach"
3939
MY_ORG_2 = "okorach-github"
4040

41+
4142
def test_sc_config_export() -> None:
4243
"""test_sc_config_export"""
4344
util.clean(util.JSON_FILE)

0 commit comments

Comments
 (0)