diff --git a/cli/config.py b/cli/config.py index ba6499c4..2524fb3d 100644 --- a/cli/config.py +++ b/cli/config.py @@ -22,7 +22,7 @@ Exports SonarQube platform configuration as JSON """ -from typing import TextIO +from typing import TextIO, Any from threading import Thread from queue import Queue @@ -122,16 +122,20 @@ def __parse_args(desc: str) -> object: return options.parse_and_check(parser=parser, logger_name=TOOL_NAME) -def __normalize_json(json_data: dict[str, any], remove_empty: bool = True, remove_none: bool = True) -> dict[str, any]: +def __normalize_json(json_data: dict[str, Any], remove_empty: bool = True, remove_none: bool = True) -> dict[str, any]: """Sorts a JSON file and optionally remove empty and none values""" log.info("Normalizing JSON - remove empty = %s, remove nones = %s", str(remove_empty), str(remove_none)) - if remove_empty: - json_data = utilities.remove_empties(json_data) - if remove_none: - json_data = utilities.remove_nones(json_data) + json_data = utilities.clean_data(json_data, remove_empty, remove_none) json_data = utilities.order_keys(json_data, *_SECTIONS_ORDER) for key in [k for k in _SECTIONS_TO_SORT if k in json_data]: - json_data[key] = {k: json_data[key][k] for k in sorted(json_data[key])} + if isinstance(json_data[key], (list, tuple, set)): + if len(json_data[key]) > 0: + sort_field = next((k for k in ("key", "name", "login") if k in json_data[key][0]), None) + if sort_field: + tmp_d = {v[sort_field]: v for v in json_data[key]} + json_data[key] = list(dict(sorted(tmp_d.items())).values()) + else: + json_data[key] = {k: json_data[key][k] for k in sorted(json_data[key])} return json_data @@ -171,8 +175,16 @@ def write_objects(queue: Queue[types.ObjectJsonRepr], fd: TextIO, object_type: s """ done = False prefix = "" + objects_exported_as_lists = ("projects", "applications", "users", "portfolios") + objects_exported_as_whole = ("qualityGates", "groups") log.info("Waiting %s to write...", object_type) - print(f'"{object_type}": ' + "{", file=fd) + if object_type in objects_exported_as_lists: + start, stop = ("[", "]") + elif object_type in objects_exported_as_whole: + start, stop = ("", "") + else: + start, stop = ("{", "}") + print(f'"{object_type}": ' + start, file=fd) while not done: obj_json = queue.get() if not (done := obj_json is utilities.WRITE_END): @@ -180,19 +192,20 @@ def write_objects(queue: Queue[types.ObjectJsonRepr], fd: TextIO, object_type: s obj_json = __prep_json_for_write(obj_json, {**export_settings, EXPORT_EMPTY: True}) else: obj_json = __prep_json_for_write(obj_json, export_settings) - if object_type in ("projects", "applications", "portfolios", "users"): - if object_type == "users": - key = obj_json.pop("login", None) - else: - key = obj_json.pop("key", None) - log.debug("Writing %s key '%s'", object_type[:-1], key) + key = "" if isinstance(obj_json, list) else obj_json.get("key", obj_json.get("login", obj_json.get("name", "unknown"))) + log.debug("Writing %s key '%s'", object_type[:-1], key) + if object_type in objects_exported_as_lists: + print(f"{prefix}{utilities.json_dump(obj_json)}", end="", file=fd) + elif object_type in objects_exported_as_whole: + print(f"{prefix}{utilities.json_dump(obj_json)}", end="", file=fd) + elif object_type in ("applications", "portfolios", "users"): print(f'{prefix}"{key}": {utilities.json_dump(obj_json)}', end="", file=fd) else: log.debug("Writing %s", object_type) print(f"{prefix}{utilities.json_dump(obj_json)[2:-1]}", end="", file=fd) prefix = ",\n" queue.task_done() - print("\n}", file=fd, end="") + print("\n" + stop, file=fd, end="") log.info("Writing %s complete", object_type) @@ -257,10 +270,7 @@ def __prep_json_for_write(json_data: types.ObjectJsonRepr, export_settings: type if export_settings.get("MODE", "CONFIG") == "MIGRATION": return json_data if not export_settings.get("FULL_EXPORT", False): - json_data = utilities.remove_nones(json_data) - if not export_settings.get(EXPORT_EMPTY, False): - log.debug("Removing empties") - json_data = utilities.remove_empties(json_data) + json_data = utilities.clean_data(json_data, remove_empty=not export_settings.get(EXPORT_EMPTY, False), remove_none=True) if export_settings.get("INLINE_LISTS", True): json_data = utilities.inline_lists(json_data, exceptions=("conditions",)) return json_data diff --git a/conf/build.sh b/conf/build.sh index 6f106553..817e5fa9 100755 --- a/conf/build.sh +++ b/conf/build.sh @@ -21,6 +21,7 @@ CONF_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" build_docs=0 build_docker=0 +offline=0 . "${CONF_DIR}/env.sh" @@ -32,6 +33,9 @@ while [[ $# -ne 0 ]]; do docker) build_docker=1 ;; + offline) + offline=1 + ;; *) ;; esac @@ -41,9 +45,18 @@ done echo "======= FORMATTING CODE =========" ruff format echo "======= BUILDING PACKAGE =========" -rm -rf "${ROOT_DIR}/build/lib/sonar" "${ROOT_DIR}/build/lib/cli" "${ROOT_DIR}"/build/scripts*/sonar-tools "${ROOT_DIR}"/dist/sonar_tools* -# python -m build -poetry build +if [[ "${offline}" = "1" ]]; then + cp "${ROOT_DIR}/conf/offline/setup.py" "${ROOT_DIR}/" + cp "${ROOT_DIR}/conf/offline/sonar-tools" "${ROOT_DIR}/" + mv "${ROOT_DIR}/pyproject.toml" "${ROOT_DIR}/pyproject.toml.bak" + python setup.py bdist_wheel + mv "${ROOT_DIR}/pyproject.toml.bak" "${ROOT_DIR}/pyproject.toml" + rm "${ROOT_DIR}/setup.py" "${ROOT_DIR}/sonar-tools" + # python -m build +else + rm -rf "${ROOT_DIR}/build/lib/sonar" "${ROOT_DIR}/build/lib/cli" "${ROOT_DIR}"/build/scripts*/sonar-tools "${ROOT_DIR}"/dist/sonar_tools* + poetry build +fi if [[ "${build_docs}" = "1" ]]; then echo "======= BUILDING DOCS =========" diff --git a/conf/offline/setup.py b/conf/offline/setup.py new file mode 100644 index 00000000..5999febd --- /dev/null +++ b/conf/offline/setup.py @@ -0,0 +1,84 @@ +# +# sonar-tools +# Copyright (C) 2019-2025 Olivier Korach +# mailto:olivier.korach AT gmail DOT com +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# + +""" + +Package setup + +""" + +import setuptools +from sonar import version + + +with open("README.md", "r", encoding="utf-8") as fh: + long_description = fh.read() +setuptools.setup( + name="sonar-tools", + version=version.PACKAGE_VERSION, + scripts=["sonar-tools"], + author="Olivier Korach", + author_email="olivier.korach@gmail.com", + description="A collection of utility scripts for SonarQube Server or Cloud", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/okorach/sonar-tools", + project_urls={ + "Bug Tracker": "https://github.com/okorach/sonar-tools/issues", + "Documentation": "https://github.com/okorach/sonar-tools/README.md", + "Source Code": "https://github.com/okorach/sonar-tools", + }, + packages=setuptools.find_packages(), + package_data={"sonar": ["LICENSE", "audit/rules.json", "config.json", "audit/sonar-audit.properties"]}, + install_requires=[ + "argparse", + "datetime", + "python-dateutil", + "requests", + "jprops", + "levenshtein", + "PyYAML ", + ], + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)", + "Operating System :: OS Independent", + ], + entry_points={ + "console_scripts": [ + "sonar-audit = cli.audit:main", + "sonar-projects-export = cli.projects_export:main", + "sonar-projects-import = cli.projects_import:main", + "sonar-projects = cli.projects_cli:main", + "sonar-measures-export = cli.measures_export:main", + "sonar-housekeeper = cli.housekeeper:main", + "sonar-issues-sync = cli.findings_sync:main", + "sonar-findings-sync = cli.findings_sync:main", + "sonar-custom-measures = cli.cust_measures:main", + "sonar-issues-export = cli.findings_export:main", + "sonar-findings-export = cli.findings_export:main", + "sonar-loc = cli.loc:main", + "sonar-config = cli.config:main", + "support-audit = cli.support:main", + "sonar-rules = cli.rules_cli:main", + ] + }, + python_requires=">=3.8", +) diff --git a/conf/offline/sonar-tools b/conf/offline/sonar-tools new file mode 100755 index 00000000..57a5fa70 --- /dev/null +++ b/conf/offline/sonar-tools @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# +# sonar-tools +# Copyright (C) 2019-2025 Olivier Korach +# mailto:olivier.korach AT gmail DOT com +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# + +"""Main entry point for sonar-tools""" + +from sonar import version + +print(f''' +sonar-tools version {version.PACKAGE_VERSION} +(c) Olivier Korach 2019-2025 +Collections of utilities for SonarQube Server and Cloud: +- sonar-audit: Audits a SonarQube Server or Cloud platform for bad practices, performance, configuration problems +- sonar-housekeeper: Deletes projects that have not been analyzed since a given number of days +- sonar-loc: Produces a list of projects with their LoC count as computed by SonarQube Server or Cloud + commercial licenses (ie taking the largest branch or PR) +- sonar-measures-export: Exports measures/metrics of one, several or all projects of the platform in CSV or JSON + (Can also export measures history) +- sonar-findings-export: Exports findings (potentially filtered) from the platform in CSV or JSON + (also available as sonar-issues-export for backward compatibility, but deprecated) +- sonar-findings-sync: Synchronizes issues between 2 branches of a same project, a whole project + branches of 2 different projects (potentially on different platforms). + (also available as sonar-issues-sync for backward compatibility, but deprecated) +- sonar-projects: Exports / Imports projects to/from zip file (Import works for EE and higher) +- sonar-config: Exports and imports an entire (or subsets of a) SonarQube Server or Cloud platform configuration as code (JSON) +- sonar-rules: Exports Sonar rules +See tools built-in -h help and https://github.com/okorach/sonar-tools for more documentation +''') diff --git a/sonar/app_branches.py b/sonar/app_branches.py index fd40b149..264eb58f 100644 --- a/sonar/app_branches.py +++ b/sonar/app_branches.py @@ -171,10 +171,13 @@ def export(self) -> types.ObjectJsonRepr: :param full: Whether to do a full export including settings that can't be set, defaults to False :type full: bool, optional """ - log.info("Exporting %s from %s", self, self.sq_json) - jsondata = {"projects": {b["key"]: b["branch"] if b["selected"] else utilities.DEFAULT for b in self.sq_json["projects"]}} + log.info("Exporting %s from %s", self, utilities.json_dump(self.sq_json)) + jsondata = {"name": self.name} if self.is_main(): jsondata["isMain"] = True + br_projects = [b for b in self.sq_json["projects"] if b.get("selected", True)] + br_projects = [{"key": b["key"], "branch": None if b["isMain"] else b["branch"]} for b in br_projects] + jsondata["projects"] = utilities.remove_nones(br_projects) return jsondata def update(self, name: str, project_branches: list[Branch]) -> bool: diff --git a/sonar/applications.py b/sonar/applications.py index 4aaef0c4..735c4d18 100644 --- a/sonar/applications.py +++ b/sonar/applications.py @@ -339,12 +339,13 @@ def export(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr: "description": None if self._description == "" else self._description, "visibility": self.visibility(), # 'projects': self.projects(), - "branches": {br.name: br.export() for br in self.branches().values()}, + "branches": [br.export() for br in self.branches().values()], "permissions": self.permissions().export(export_settings=export_settings), "tags": self.get_tags(), } ) - return util.filter_export(json_data, _IMPORTABLE_PROPERTIES, export_settings.get("FULL_EXPORT", False)) + json_data = util.filter_export(json_data, _IMPORTABLE_PROPERTIES, export_settings.get("FULL_EXPORT", False)) + return util.clean_data(json_data) def set_permissions(self, data: types.JsonPermissions) -> application_permissions.ApplicationPermissions: """Sets an application permissions @@ -525,11 +526,9 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwarg app_json = app.export(export_settings) if write_q: write_q.put(app_json) - else: - app_json.pop("key") - apps_settings[k] = app_json + apps_settings[k] = app_json write_q and write_q.put(util.WRITE_END) - return apps_settings + return dict(sorted(app_json.items())).values() def audit(endpoint: pf.Platform, audit_settings: types.ConfigSettings, **kwargs) -> list[problem.Problem]: @@ -602,12 +601,4 @@ def search_by_name(endpoint: pf.Platform, name: str) -> dict[str, Application]: def convert_for_yaml(original_json: types.ObjectJsonRepr) -> types.ObjectJsonRepr: """Convert the original JSON defined for JSON export into a JSON format more adapted for YAML export""" - new_json = util.dict_to_list(util.remove_nones(original_json), "key") - for app_json in new_json: - app_json["branches"] = util.dict_to_list(app_json["branches"], "name") - for b in app_json["branches"]: - if "projects" in b: - b["projects"] = [{"key": k, "branch": br} for k, br in b["projects"].items()] - if "permissions" in app_json: - app_json["permissions"] = permissions.convert_for_yaml(app_json["permissions"]) - return new_json + return original_json diff --git a/sonar/devops.py b/sonar/devops.py index 1db7e129..04434bc4 100644 --- a/sonar/devops.py +++ b/sonar/devops.py @@ -239,12 +239,8 @@ def export(endpoint: platform.Platform, export_settings: types.ConfigSettings) - :meta private: """ log.info("Exporting DevOps integration settings") - json_data = {} - for s in get_list(endpoint).values(): - export_data = s.to_json(export_settings) - key = export_data.pop("key") - json_data[key] = export_data - return json_data + devops_list = {s.key: s.to_json(export_settings) for s in get_list(endpoint).values()} + return list(dict(sorted(devops_list.items())).values()) def import_config(endpoint: platform.Platform, config_data: types.ObjectJsonRepr, key_list: types.KeyList = None) -> int: diff --git a/sonar/groups.py b/sonar/groups.py index c9586847..20eca69d 100644 --- a/sonar/groups.py +++ b/sonar/groups.py @@ -345,11 +345,12 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwarg """ log.info("Exporting groups") - g_list = {} + g_list = [] for g_name, g_obj in get_list(endpoint=endpoint).items(): if not export_settings.get("FULL_EXPORT", False) and g_obj.is_default(): continue - g_list[g_name] = "" if g_obj.description is None else g_obj.description + g_list.append({"name": g_name, "description": g_obj.description}) + util.clean_data(g_list, remove_empty=False) log.info("%s groups to export", len(g_list)) if write_q := kwargs.get("write_q", None): write_q.put(g_list) diff --git a/sonar/permissions/permission_templates.py b/sonar/permissions/permission_templates.py index 687ae63c..738d8ff1 100644 --- a/sonar/permissions/permission_templates.py +++ b/sonar/permissions/permission_templates.py @@ -42,7 +42,7 @@ _CREATE_API = "permissions/create_template" _UPDATE_API = "permissions/update_template" -_IMPORTABLE_PROPERTIES = ("name", "description", "pattern", "permissions", "defaultFor") +_IMPORTABLE_PROPERTIES = ("name", "pattern", "defaultFor", "description", "permissions") class PermissionTemplate(sqobject.SqObject): @@ -159,16 +159,13 @@ def set_pattern(self, pattern: str) -> PermissionTemplate: def to_json(self, export_settings: types.ConfigSettings = None) -> types.ObjectJsonRepr: """Returns JSON representation of a permission template""" - json_data = self.sq_json.copy() - json_data.update( - { - "key": self.key, - "name": self.name, - "description": self.description if self.description != "" else None, - "pattern": self.project_key_pattern, - "permissions": self.permissions().export(export_settings=export_settings), - } - ) + json_data = self.sq_json.copy() | { + "key": self.key, + "name": self.name, + "description": self.description if self.description != "" else None, + "pattern": self.project_key_pattern, + "permissions": self.permissions().export(export_settings=export_settings), + } defaults = [] if self.is_projects_default(): @@ -265,14 +262,9 @@ def _load_default_templates(endpoint: pf.Platform, data: types.ApiPayload = None def export(endpoint: pf.Platform, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr: """Exports permission templates as JSON""" log.info("Exporting permission templates") - pt_list = get_list(endpoint) - json_data = {} - for pt in pt_list.values(): - json_data[pt.name] = pt.to_json(export_settings) - if not export_settings.get("FULL_EXPORT"): - for k in ("name", "id", "key"): - json_data[pt.name].pop(k, None) - return json_data + json_data = {pt.name: pt.to_json(export_settings) for pt in get_list(endpoint).values()} + log.info("PT RES = %s", utilities.json_dump(json_data)) + return list(dict(sorted(json_data.items())).values()) def import_config(endpoint: pf.Platform, config_data: types.ObjectJsonRepr) -> int: diff --git a/sonar/permissions/permissions.py b/sonar/permissions/permissions.py index 7aa1a58d..7ae609f8 100644 --- a/sonar/permissions/permissions.py +++ b/sonar/permissions/permissions.py @@ -61,8 +61,8 @@ _PORTFOLIOS = 6 OBJECTS_WITH_PERMISSIONS = (_GLOBAL, _PROJECTS, _TEMPLATES, _QG, _QP, _APPS, _PORTFOLIOS) -PERMISSION_TYPES = ("users", "groups") -NO_PERMISSIONS = {"users": None, "groups": None} +PERMISSION_TYPES = ("groups", "users") +NO_PERMISSIONS = {"groups": None, "users": None} MAX_PERMS = 100 @@ -81,15 +81,18 @@ def __init__(self, concerned_object: object) -> None: def __str__(self) -> str: return f"permissions of {str(self.concerned_object)}" - def to_json(self, perm_type: str | None = None, csv: bool = False) -> types.JsonPermissions: + def to_json(self, perm_type: Optional[str] = None, csv: bool = False) -> types.JsonPermissions: """Converts a permission object to JSON""" if not csv: return self.permissions.get(perm_type, {}) if is_valid(perm_type) else self.permissions - perms = {} + perms = [] for p in normalize(perm_type): if p not in self.permissions or len(self.permissions[p]) == 0: continue - perms[p] = {k: encode(v) for k, v in self.permissions.get(p, {}).items()} + for k, v in self.permissions.get(p, {}).items(): + if not v or len(v) == 0: + continue + perms += [{p[:-1]: k, "permissions": encode(v)}] return perms if len(perms) > 0 else None def export(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr: @@ -434,8 +437,4 @@ def black_list(perms: types.JsonPermissions, disallowed_perms: list[str]) -> typ def convert_for_yaml(json_perms: types.ObjectJsonRepr) -> types.ObjectJsonRepr: """Converts permissions in a format that is more friendly for YAML""" - converted_perms = [] - for ptype in "groups", "users": - if ptype in json_perms: - converted_perms += utilities.dict_to_list(json_perms[ptype], ptype[:-1], "permissions") - return converted_perms + return json_perms diff --git a/sonar/platform.py b/sonar/platform.py index c1c8004b..2c925a07 100644 --- a/sonar/platform.py +++ b/sonar/platform.py @@ -384,12 +384,12 @@ def database(self) -> str: return self.sys_info()["Statistics"]["database"]["name"] return self.sys_info()["Database"]["Database"] - def plugins(self) -> dict[str, str]: + def plugins(self) -> list[dict[str, str]]: """ :return: the SonarQube platform plugins """ if self.is_sonarcloud(): - return {} + return [] sysinfo = self.sys_info() if "Application Nodes" in sysinfo: sysinfo = sysinfo["Application Nodes"][0] @@ -926,23 +926,6 @@ def _check_for_retry(response: requests.models.Response) -> tuple[bool, str]: def convert_for_yaml(original_json: types.ObjectJsonRepr) -> types.ObjectJsonRepr: """Convert the original JSON defined for JSON export into a JSON format more adapted for YAML export""" - original_json = util.remove_nones(original_json) - if "plugins" in original_json: - original_json["plugins"] = util.dict_to_list(original_json["plugins"], "key", "version") - if "languages" in original_json: - original_json["languages"] = util.dict_to_list(original_json["languages"], "language") - if "permissions" in original_json: - original_json["permissions"] = permissions.convert_for_yaml(original_json["permissions"]) - if "permissionTemplates" in original_json: - for tpl in original_json["permissionTemplates"].values(): - if "permissions" in tpl: - tpl["permissions"] = permissions.convert_for_yaml(tpl["permissions"]) - original_json["permissionTemplates"] = util.dict_to_list(original_json["permissionTemplates"], "name") - if "devopsIntegration" in original_json: - original_json["devopsIntegration"] = util.dict_to_list(original_json["devopsIntegration"], "name") - for key in ("analysisScope", "authentication", "generalSettings", "linters"): - if key in original_json: - original_json[key] = util.dict_to_list(original_json[key], "key", "value") return original_json diff --git a/sonar/portfolios.py b/sonar/portfolios.py index 467bdfbd..361f4042 100644 --- a/sonar/portfolios.py +++ b/sonar/portfolios.py @@ -72,13 +72,16 @@ "key", "name", "description", + "mode", + "projects", + "regexp", + "tags", + "applications", + "portfolios", "visibility", "permissions", - "projects", "projectsList", - "portfolios", "subPortfolios", - "applications", ) @@ -199,20 +202,22 @@ def load_selection_mode(self) -> None: mode = self.sq_json.get(_API_SELECTION_MODE_FIELD, None) if mode is None: return - branch = self.sq_json.get("branch", c.DEFAULT_BRANCH) + branch = self.sq_json.get("branch", None) if mode == _SELECTION_MODE_MANUAL: - self._selection_mode = {mode: {}} + self._selection_mode = {"mode": _SELECTION_MODE_MANUAL, "projects": []} for projdata in self.sq_json.get("selectedProjects", {}): - branch_list = projdata.get("selectedBranches", [c.DEFAULT_BRANCH]) - self._selection_mode[mode].update({projdata["projectKey"]: set(branch_list)}) + if branch_list := projdata.get("selectedBranches", None): + self._selection_mode["projects"].append({"key": projdata["projectKey"], "branches": list(set(branch_list))}) + else: + self._selection_mode["projects"].append({"key": projdata["projectKey"]}) elif mode == _SELECTION_MODE_REGEXP: - self._selection_mode = {mode: self.sq_json["regexp"], "branch": branch} + self._selection_mode = util.clean_data({"mode": mode, "regexp": self.sq_json["regexp"], "branch": branch}) elif mode == _SELECTION_MODE_TAGS: - self._selection_mode = {mode: self.sq_json["tags"], "branch": branch} + self._selection_mode = util.clean_data({"mode": mode, "tags": self.sq_json["tags"], "branch": branch}) elif mode == _SELECTION_MODE_REST: - self._selection_mode = {mode: True, "branch": branch} + self._selection_mode = util.clean_data({"mode": mode, "branch": branch}) else: - self._selection_mode = {mode: True} + self._selection_mode = {"mode": mode} def refresh(self) -> None: """Refreshes a portfolio data from the Sonar instance""" @@ -371,24 +376,18 @@ def to_json(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr json_data["permissions"] = self.permissions().export(export_settings=export_settings) json_data["tags"] = self._tags if subportfolios: - json_data["portfolios"] = {} - for s in subportfolios.values(): - subp_json = s.to_json(export_settings) - subp_key = subp_json.pop("key") - json_data["portfolios"][subp_key] = subp_json - mode = self.selection_mode().copy() - if mode: - if "none" not in mode or export_settings.get("MODE", "") == "MIGRATION": - json_data["projects"] = mode + json_data["portfolios"] = [s.to_json(export_settings) for s in subportfolios.values()] + if mode := self.selection_mode(): + json_data.update(mode) + json_data["mode"] = json_data["mode"].lower() if export_settings.get("MODE", "") == "MIGRATION": json_data["projects"]["keys"] = self.get_project_list() - json_data["applications"] = self._applications + json_data["applications"] = [{"key": k, "branches": v} for k, v in self._applications.items()] return json_data def export(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr: """Exports a portfolio (for sonar-config)""" - log.info("Exporting %s", str(self)) - return util.remove_nones(util.filter_export(self.to_json(export_settings), _IMPORTABLE_PROPERTIES, export_settings.get("FULL_EXPORT", False))) + return util.clean_data(util.filter_export(self.to_json(export_settings), _IMPORTABLE_PROPERTIES, export_settings.get("FULL_EXPORT", False))) def permissions(self) -> pperms.PortfolioPermissions: """Returns a portfolio permissions (if toplevel) or None if sub-portfolio""" @@ -782,25 +781,24 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwarg portfolio_list = {k: v for k, v in get_list(endpoint=endpoint).items() if not key_regexp or re.match(key_regexp, k)} nb_portfolios = len(portfolio_list) i = 0 - exported_portfolios = {} - for k, p in portfolio_list.items(): + exported_portfolios = [] + for p in dict(sorted(portfolio_list.items())).values(): try: if not p.is_sub_portfolio(): exp = p.export(export_settings) if write_q: write_q.put(exp) else: - exp.pop("key") - exported_portfolios[k] = exp + exported_portfolios.append(exp) else: log.debug("Skipping export of %s, it's a standard sub-portfolio", str(p)) except exceptions.SonarException: - exported_portfolios[k] = {} + pass i += 1 if i % 10 == 0 or i == nb_portfolios: log.info("Exported %d/%d portfolios (%d%%)", i, nb_portfolios, (i * 100) // nb_portfolios) write_q and write_q.put(util.WRITE_END) - return dict(sorted(exported_portfolios.items())) + return exported_portfolios def recompute(endpoint: pf.Platform) -> None: @@ -849,19 +847,7 @@ def get_api_branch(branch: str) -> str: def convert_for_yaml(original_json: types.ObjectJsonRepr) -> types.ObjectJsonRepr: """Convert the original JSON defined for JSON export into a JSON format more adapted for YAML export""" - new_json = util.dict_to_list(util.remove_nones(original_json), "key") - for p_json in new_json: - try: - p_json["projects"] = [{"key": k, "branch": br} for k, br in p_json["projects"]["manual"].items()] - except KeyError: - pass - if "portfolios" in p_json: - p_json["portfolios"] = convert_for_yaml(p_json["portfolios"]) - if "applications" in p_json: - p_json["applications"] = [{"key": k, "branches": br} for k, br in p_json["applications"].items()] - if "permissions" in p_json: - p_json["permissions"] = perms.convert_for_yaml(p_json["permissions"]) - return new_json + return original_json def clear_cache(endpoint: pf.Platform) -> None: diff --git a/sonar/projects.py b/sonar/projects.py index e5c41255..33d1a178 100644 --- a/sonar/projects.py +++ b/sonar/projects.py @@ -1507,7 +1507,7 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwarg log.log(lvl, "%d/%d projects exported (%d%%)", current, total, (current * 100) // total) log.debug("Projects export complete") write_q and write_q.put(util.WRITE_END) - return dict(sorted(results.items())) + return list(dict(sorted(results.items())).values()) def exists(endpoint: pf.Platform, key: str) -> bool: @@ -1711,8 +1711,4 @@ def convert_proj_for_yaml(proj_json: types.ObjectJsonRepr) -> types.ObjectJsonRe def convert_for_yaml(original_json: types.ObjectJsonRepr) -> types.ObjectJsonRepr: """Convert the original JSON defined for JSON export into a JSON format more adapted for YAML export""" - clean_json = util.remove_nones(original_json) - new_json = [] - for proj in util.dict_to_list(clean_json, "key"): - new_json.append(convert_proj_for_yaml(proj)) - return new_json + return original_json diff --git a/sonar/qualitygates.py b/sonar/qualitygates.py index defa383b..4af00d3a 100644 --- a/sonar/qualitygates.py +++ b/sonar/qualitygates.py @@ -92,7 +92,7 @@ "prioritized_rule_issues": (0, 0, __MAX_ISSUES_SHOULD_BE_ZERO), } -_IMPORTABLE_PROPERTIES = ("isDefault", "isBuiltIn", "conditions", "permissions") +_IMPORTABLE_PROPERTIES = ("name", "isDefault", "isBuiltIn", "conditions", "permissions") class QualityGate(sq.SqObject): @@ -376,7 +376,7 @@ def audit(self, audit_settings: types.ConfigSettings = None) -> list[Problem]: def to_json(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr: """Returns JSON representation of object""" - json_data = self.sq_json + json_data = {"name": self.name} | self.sq_json full = export_settings.get("FULL_EXPORT", False) if not self.is_default and not full: json_data.pop("isDefault", None) @@ -458,8 +458,8 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwarg """ log.info("Exporting quality gates") qg_list = {k: qg.to_json(export_settings) for k, qg in get_list(endpoint).items()} - write_q = kwargs.get("write_q", None) - if write_q: + qg_list = list(dict(sorted(qg_list.items())).values()) + if write_q := kwargs.get("write_q", None): write_q.put(qg_list) write_q.put(util.WRITE_END) return qg_list @@ -565,4 +565,4 @@ def search_by_name(endpoint: pf.Platform, name: str) -> dict[str, QualityGate]: def convert_for_yaml(original_json: types.ObjectJsonRepr) -> types.ObjectJsonRepr: """Convert the original JSON defined for JSON export into a JSON format more adapted for YAML export""" - return util.dict_to_list(util.remove_nones(original_json), "name") + return original_json diff --git a/sonar/qualityprofiles.py b/sonar/qualityprofiles.py index 76968acc..bf92cd17 100644 --- a/sonar/qualityprofiles.py +++ b/sonar/qualityprofiles.py @@ -398,7 +398,7 @@ def to_json(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr if self.rule_is_prioritized(rule.key): data["prioritized"] = True if self.rule_has_custom_severities(rule.key): - data["severities"] = self.rule_impacts(rule.key, substitute_with_default=True) + data["impacts"] = self.rule_impacts(rule.key, substitute_with_default=True) json_data["rules"].append({"key": rule.key, **data}) json_data["permissions"] = self.permissions().export(export_settings) return util.remove_nones(util.filter_export(json_data, _IMPORTABLE_PROPERTIES, full)) @@ -437,7 +437,7 @@ def __process_rules_diff(self, rule_set: dict[str:str]) -> dict[str:str]: r_key = rule["key"] diff_rules[r_key] = {} if self.rule_has_custom_severities(r_key): - diff_rules[r_key]["severities"] = self.rule_impacts(r_key, substitute_with_default=True) + diff_rules[r_key]["impacts"] = self.rule_impacts(r_key, substitute_with_default=True) if self.rule_is_prioritized(r_key): diff_rules[r_key]["prioritized"] = True if (params := self.rule_custom_params(r_key)) is not None: diff --git a/sonar/rules.py b/sonar/rules.py index 11b6e9f4..9cfad3e2 100644 --- a/sonar/rules.py +++ b/sonar/rules.py @@ -139,6 +139,8 @@ LEGACY_CSV_EXPORT_FIELDS = ["key", "language", "repo", "type", "severity", "name", "ruleType", "tags"] +_IMPORTABLE_PROPERTIES = ["key", "severity", "impacts", "description", "params", "isTemplate", "templateKey", "tags", "mdNote", "language"] + _CLASS_LOCK = Lock() @@ -297,18 +299,16 @@ def to_csv(self) -> list[str]: def export(self, full: bool = False) -> types.ObjectJsonRepr: """Returns the JSON corresponding to a rule export""" - rule = self.to_json() - d = {"severity": rule.get("severity", ""), "impacts": self.impacts(), "description": self.custom_desc} - if len(rule.get("params", {})) > 0: - d["params"] = rule["params"] if full else {p["key"]: p.get("defaultValue", "") for p in rule["params"]} - mapping = {"isTemplate": "isTemplate", "tags": "tags", "lang": "language", "templateKey": "templateKey"} - d |= {newkey: rule[oldkey] for oldkey, newkey in mapping.items() if oldkey in rule} + d = self.to_json() + d["key"] = self.key + d["impacts"] = self.impacts() + if len(d.get("params", {})) > 0: + d["params"] = d["params"] if full else [{"key": p["key"], "value": p.get("defaultValue", "")} for p in d["params"]] + mapping = {"lang": "language", "mdNote": "description"} + d |= {newkey: d[oldkey] for oldkey, newkey in mapping.items() if oldkey in d} if not d["isTemplate"]: d.pop("isTemplate", None) - if full: - d.update({f"_{k}": v for k, v in rule.items() if k not in ("severity", "params", "isTemplate", "tags", "mdNote", "lang")}) - d.pop("_key", None) - return utilities.remove_nones(d) + return utilities.filter_export(d, _IMPORTABLE_PROPERTIES, full) def set_tags(self, tags: list[str]) -> bool: """Sets rule custom tags""" @@ -348,18 +348,19 @@ def impacts(self, quality_profile_id: Optional[str] = None, substitute_with_defa self.refresh() found_qp = next((qp for qp in self.sq_json.get("actives", []) if quality_profile_id and qp["qProfile"] == quality_profile_id), None) if not found_qp: - return self._impacts if len(self._impacts) > 0 else {TYPE_TO_QUALITY[self.type]: self.severity} - if self.endpoint.is_mqr_mode(): - qp_impacts = {imp["softwareQuality"]: imp["severity"] for imp in found_qp["impacts"]} - default_impacts = self._impacts + qp_impacts = self._impacts if len(self._impacts) > 0 else {TYPE_TO_QUALITY[self.type]: self.severity} else: - qp_impacts = {TYPE_TO_QUALITY[self.type]: self.severity} - default_impacts = {TYPE_TO_QUALITY[self.type]: self.severity} + if self.endpoint.is_mqr_mode(): + qp_impacts = {imp["softwareQuality"]: imp["severity"] for imp in found_qp["impacts"]} + default_impacts = self._impacts + else: + qp_impacts = {TYPE_TO_QUALITY[self.type]: self.severity} + default_impacts = {TYPE_TO_QUALITY[self.type]: self.severity} - if substitute_with_default: - return {k: c.DEFAULT if qp_impacts[k] == default_impacts.get(k, qp_impacts[k]) else v for k, v in qp_impacts.items()} - else: - return qp_impacts + if substitute_with_default: + qp_impacts = {k: c.DEFAULT if qp_impacts[k] == default_impacts.get(k, qp_impacts[k]) else v for k, v in qp_impacts.items()} + + return {k.lower(): v for k, v in qp_impacts.items()} def __get_quality_profile_data(self, quality_profile_id: str) -> Optional[dict[str, str]]: if not quality_profile_id: @@ -454,22 +455,20 @@ def export(endpoint: platform.Platform, export_settings: types.ConfigSettings, * threads = 16 if endpoint.is_sonarcloud() else 8 get_all_rules_details(endpoint=endpoint, threads=export_settings.get("threads", threads)) - all_rules = get_list(endpoint=endpoint, use_cache=False, include_external=False).items() + all_rules = get_list(endpoint=endpoint, use_cache=False, include_external=False).values() rule_list = {} - rule_list["instantiated"] = {k: rule.export(full) for k, rule in all_rules if rule.is_instantiated()} - rule_list["extended"] = {k: rule.export(full) for k, rule in all_rules if rule.is_extended()} + rule_list["instantiated"] = [rule.export(full) for rule in all_rules if rule.is_instantiated()] + rule_list["extended"] = [rule.export(full) for rule in all_rules if rule.is_extended()] if not full: - rule_list["extended"] = utilities.remove_nones( - { - k: {"tags": v.get("tags", None), "description": v.get("description", None)} - for k, v in rule_list["extended"].items() - if "tags" in v or "description" in v - } - ) + rule_list["extended"] = [ + utilities.clean_data({"key": v["key"], "tags": v.get("tags", None), "description": v.get("description", None)}) + for v in rule_list["extended"] + if "tags" in v or "description" in v + ] if full: - rule_list["standard"] = {k: rule.export(full) for k, rule in all_rules if not rule.is_instantiated() and not rule.is_extended()} + rule_list["standard"] = [rule.export(full) for rule in all_rules if not rule.is_instantiated() and not rule.is_extended()] if export_settings.get("MODE", "") == "MIGRATION": - rule_list["thirdParty"] = {r.key: r.export() for r in third_party(endpoint=endpoint)} + rule_list["thirdParty"] = [r.export() for r in third_party(endpoint=endpoint)] for k in ("instantiated", "extended", "standard", "thirdParty"): if len(rule_list.get(k, {})) == 0: diff --git a/sonar/sif.py b/sonar/sif.py index aba4f693..9942f1e1 100644 --- a/sonar/sif.py +++ b/sonar/sif.py @@ -107,14 +107,14 @@ def database(self) -> str: else: return self.json["Database"]["Database"] - def plugins(self) -> dict[str, dict[str, str]]: + def plugins(self) -> list[dict[str, str]]: """Returns plugins installed on the SQ instance represented by the SIF""" d = self.json["Plugins"] if self.version() >= (9, 7, 0) else self.json[_STATS]["plugins"] plugins_dict = {} for k, v in d.items(): version, name = v.split(" ", maxsplit=1) - plugins_dict[k] = {"version": version, "name": name[1:-1]} - return plugins_dict + plugins_dict[k] = {"key": k, "name": name[1:-1], "version": version} + return list(dict(sorted(plugins_dict.items())).values()) def license_type(self) -> Optional[str]: """Returns the SIF SQ license type (prod or test)""" diff --git a/sonar/users.py b/sonar/users.py index 67b2ad55..3eb4e9de 100644 --- a/sonar/users.py +++ b/sonar/users.py @@ -483,13 +483,12 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwarg """ log.info("Exporting users") write_q = kwargs.get("write_q", None) - u_list = {} + u_list = [] for u_login, u_obj in sorted(search(endpoint=endpoint).items()): - u_list[u_login] = u_obj.to_json(export_settings) + d = u_obj.to_json(export_settings) + u_list.append(d) if write_q: - write_q.put(u_list[u_login]) - else: - u_list[u_login].pop("login", None) + write_q.put(d) write_q and write_q.put(util.WRITE_END) return u_list @@ -564,7 +563,7 @@ def import_config(endpoint: pf.Platform, config_data: types.ObjectJsonRepr, key_ def convert_for_yaml(original_json: types.ObjectJsonRepr) -> types.ObjectJsonRepr: """Convert the original JSON defined for JSON export into a JSON format more adapted for YAML export""" - return util.dict_to_list(original_json, "login") + return original_json def exists(endpoint: pf.Platform, login: str) -> bool: diff --git a/sonar/utilities.py b/sonar/utilities.py index 0fc9ec4d..3d75cbd9 100644 --- a/sonar/utilities.py +++ b/sonar/utilities.py @@ -172,18 +172,6 @@ def convert_to_type(value: Any) -> Any: return value -def remove_nones(d: dict[str, any]) -> dict[str, any]: - """Removes elements of the dict that are None values""" - new_d = d.copy() - for k, v in d.items(): - if v is None: - new_d.pop(k) - continue - if isinstance(v, dict): - new_d[k] = remove_nones(v) - return new_d - - def none_to_zero(d: dict[str, any], key_match: str = "^.+$") -> dict[str, any]: """Replaces None values in a dict with 0""" new_d = d.copy() @@ -197,21 +185,36 @@ def none_to_zero(d: dict[str, any], key_match: str = "^.+$") -> dict[str, any]: return new_d -def remove_empties(d: dict[str, any]) -> dict[str, any]: +def remove_nones(d: Any) -> Any: + """Removes elements of the data that are None values""" + return clean_data(d, remove_empty=False, remove_none=True) + + +def clean_data(d: Any, remove_empty: bool = True, remove_none: bool = True) -> Any: """Recursively removes empty lists and dicts and none from a dict""" # log.debug("Cleaning up %s", json_dump(d)) - new_d = d.copy() - for k, v in d.items(): - if isinstance(v, str) and v == "": - new_d.pop(k) - continue - if not isinstance(v, (list, dict)): - continue - if len(v) == 0: - new_d.pop(k) - elif isinstance(v, dict): - new_d[k] = remove_empties(v) - return new_d + if not isinstance(d, (list, dict)): + return d + + if isinstance(d, list): + # Remove empty strings and nones + if remove_empty: + d = [elem for elem in d if not (isinstance(elem, str) and elem == "")] + if remove_none: + d = [elem for elem in d if elem is not None] + return [clean_data(elem, remove_empty, remove_none) for elem in d] + + # Remove empty dict string values + if remove_empty: + new_d = {k: v for k, v in d.items() if not isinstance(v, str) or v != ""} + if remove_none: + new_d = {k: v for k, v in d.items() if v is not None} + + # Remove empty dict list or dict values + new_d = {k: v for k, v in new_d.items() if not isinstance(v, (list, dict)) or len(v) > 0} + + # Recurse + return {k: clean_data(v, remove_empty, remove_none) for k, v in new_d.items()} def sort_lists(data: Any, redact_tokens: bool = True) -> Any: @@ -578,18 +581,20 @@ def __prefix(value: Any) -> Any: return value -def filter_export(json_data: dict[str, any], key_properties: list[str], full: bool) -> dict[str, any]: +def filter_export(json_data: dict[str, Any], key_properties: list[str], full: bool) -> dict[str, Any]: """Filters dict for export removing or prefixing non-key properties""" - new_json_data = json_data.copy() - for k in json_data: - if k not in key_properties: - if full and k != "actions": - new_json_data[f"_{k}"] = __prefix(new_json_data.pop(k)) - else: - new_json_data.pop(k) + new_json_data = {k: json_data[k] for k in key_properties if k in json_data} + if full: + new_json_data |= {f"_{k}": __prefix(v) for k, v in json_data.items() if k not in key_properties} return new_json_data +def order_dict(d: dict[str, Any], key_order: list[str]) -> dict[str, Any]: + """Orders keys of a dictionary in a given order""" + new_d = {k: d[k] for k in key_order if k in d} + return new_d | {k: v for k, v in d.items() if k not in new_d} + + def replace_keys(key_list: list[str], new_key: str, data: dict[str, any]) -> dict[str, any]: """Replace a list of old keys by a new key in a dict""" for k in key_list: