diff --git a/cli/config.py b/cli/config.py index 342beac0..56230a6f 100644 --- a/cli/config.py +++ b/cli/config.py @@ -22,7 +22,7 @@ Exports SonarQube platform configuration as JSON """ -from typing import TextIO +from typing import TextIO, Any from threading import Thread from queue import Queue @@ -119,6 +119,16 @@ def __parse_args(desc: str) -> object: action="store_true", help="By default, sonar-config does not export empty values, setting this flag will add empty values in the export", ) + parser.add_argument( + "--convertFrom", + required=False, + help="Source sonar-config old JSON format", + ) + parser.add_argument( + "--convertTo", + required=False, + help="Target sonar-config new JSON format", + ) return options.parse_and_check(parser=parser, logger_name=TOOL_NAME) @@ -294,11 +304,40 @@ def __import_config(endpoint: platform.Platform, what: list[str], **kwargs) -> N log.info("Importing configuration to %s completed", kwargs[options.URL]) +def convert_json(**kwargs) -> dict[str, Any]: + """Converts a sonar-config report from the old to the new JSON format""" + with open(kwargs["convertFrom"], encoding="utf-8") as fd: + old_json = json.loads(fd.read()) + mapping = { + "platform": platform.old_to_new_json, + "globalSettings": platform.global_settings_old_to_new_json, + "qualityProfiles": qualityprofiles.old_to_new_json, + "qualityGates": qualitygates.old_to_new_json, + "projects": projects.old_to_new_json, + "portfolios": portfolios.old_to_new_json, + "applications": applications.old_to_new_json, + "users": users.old_to_new_json, + "groups": groups.old_to_new_json, + "rules": rules.old_to_new_json, + } + new_json = {} + for k, func in mapping.items(): + if k in old_json: + log.info("Converting %s", k) + new_json[k] = func(old_json[k]) + with open(kwargs["convertTo"], mode="w", encoding="utf-8") as fd: + print(utilities.json_dump(new_json), file=fd) + return new_json + + def main() -> None: """Main entry point for sonar-config""" start_time = utilities.start_clock() try: kwargs = utilities.convert_args(__parse_args("Extract SonarQube Server or Cloud platform configuration")) + if kwargs["convertFrom"] is not None: + convert_json(**kwargs) + utilities.final_exit(errcodes.OK, "", start_time) endpoint = platform.Platform(**kwargs) endpoint.verify_connection() endpoint.set_user_agent(f"{TOOL_NAME} {version.PACKAGE_VERSION}") diff --git a/sonar/applications.py b/sonar/applications.py index 29ff4f6e..42745e5c 100644 --- a/sonar/applications.py +++ b/sonar/applications.py @@ -23,7 +23,7 @@ """ from __future__ import annotations -from typing import Optional +from typing import Optional, Any import re import json from datetime import datetime @@ -340,10 +340,11 @@ def export(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr: "visibility": self.visibility(), # 'projects': self.projects(), "branches": {br.name: br.export() for br in self.branches().values()}, - "permissions": util.perms_to_list(self.permissions().export(export_settings=export_settings)), + "permissions": self.permissions().export(export_settings=export_settings), "tags": self.get_tags(), } ) + json_data = old_to_new_json_one(json_data) return util.filter_export(json_data, _IMPORTABLE_PROPERTIES, export_settings.get("FULL_EXPORT", False)) def set_permissions(self, data: types.JsonPermissions) -> application_permissions.ApplicationPermissions: @@ -507,7 +508,7 @@ def exists(endpoint: pf.Platform, key: str) -> bool: return False -def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwargs) -> types.ObjectJsonRepr: +def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwargs) -> list[dict[str, Any]]: """Exports applications as JSON :param endpoint: Reference to the Sonar platform @@ -520,14 +521,13 @@ def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwarg key_regexp = kwargs.get("key_list", ".*") app_list = {k: v for k, v in get_list(endpoint).items() if not key_regexp or re.match(key_regexp, k)} - apps_settings = {} + apps_settings = [] for k, app in app_list.items(): app_json = app.export(export_settings) if write_q: write_q.put(app_json) else: - app_json.pop("key") - apps_settings[k] = app_json + apps_settings.append(app_json) write_q and write_q.put(util.WRITE_END) return apps_settings @@ -598,3 +598,21 @@ def search_by_name(endpoint: pf.Platform, name: str) -> dict[str, Application]: data[app.key] = app # return {app.key: app for app in Application.CACHE.values() if app.name == name} return data + + +def old_to_new_json_one(old_app_json: dict[str, Any]) -> dict[str, Any]: + """Converts sonar-config old JSON report format to new format for a single application""" + new_json = old_app_json.copy() + if "permissions" in old_app_json: + new_json["permissions"] = util.perms_to_list(old_app_json["permissions"]) + if "branches" in old_app_json: + new_json["branches"] = util.dict_to_list(old_app_json["branches"], "name") + return new_json + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts sonar-config old JSON report format to new format""" + new_json = old_json.copy() + for k, v in new_json.items(): + new_json[k] = old_to_new_json_one(v) + return util.dict_to_list(new_json, "key") diff --git a/sonar/groups.py b/sonar/groups.py index f35f7e7d..096908a6 100644 --- a/sonar/groups.py +++ b/sonar/groups.py @@ -24,7 +24,7 @@ from __future__ import annotations import json -from typing import Optional +from typing import Optional, Any import sonar.logging as log import sonar.platform as pf @@ -428,7 +428,12 @@ def import_config(endpoint: pf.Platform, config_data: types.ObjectJsonRepr, key_ def exists(endpoint: pf.Platform, name: str) -> bool: """ :param endpoint: reference to the SonarQube platform - :param group_name: group name to check + :param name: group name to check :return: whether the group exists """ return Group.get_object(endpoint=endpoint, name=name) is not None + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts sonar-config old groups JSON report format to new format""" + return util.dict_to_list(old_json, "name", "description") diff --git a/sonar/platform.py b/sonar/platform.py index d1fa62aa..fe0086a2 100644 --- a/sonar/platform.py +++ b/sonar/platform.py @@ -465,9 +465,7 @@ def __urlstring(self, api: str, params: types.ApiParams, data: Optional[str] = N return url def webhooks(self) -> dict[str, webhooks.WebHook]: - """ - :return: the list of global webhooks - """ + """Returns the list of global webhooks""" return webhooks.get_list(self) def export(self, export_settings: types.ConfigSettings, full: bool = False) -> types.ObjectJsonRepr: @@ -476,19 +474,16 @@ def export(self, export_settings: types.ConfigSettings, full: bool = False) -> t :param full: Whether to also export properties that cannot be set, defaults to False :type full: bool, optional :return: dict of all properties with their values - :rtype: dict """ log.info("Exporting platform global settings") json_data = {} + settings_list = self.__settings(include_not_set=export_settings.get("EXPORT_DEFAULTS", False)).values() + settings_list = [s for s in settings_list if s.is_global() and not s.is_internal()] for s in self.__settings(include_not_set=export_settings.get("EXPORT_DEFAULTS", False)).values(): - if s.is_internal(): - continue (categ, subcateg) = s.category() if self.is_sonarcloud() and categ == settings.THIRD_PARTY_SETTINGS: # What is reported as 3rd part are SonarQube Cloud internal settings continue - if not s.is_global(): - continue util.update_json(json_data, categ, subcateg, s.to_json(export_settings.get("INLINE_LISTS", True))) hooks = {} @@ -650,8 +645,7 @@ def _audit_logs(self, audit_settings: types.ConfigSettings) -> list[Problem]: if rule is not None: problems.append(Problem(rule, f"{self.local_url}/admin/system", logfile, line)) logs = self.get("system/logs", params={"name": "deprecation"}).text - nb_deprecation = len(logs.splitlines()) - if nb_deprecation > 0: + if (nb_deprecation := len(logs.splitlines())) > 0: rule = get_rule(RuleId.DEPRECATION_WARNINGS) problems.append(Problem(rule, f"{self.local_url}/admin/system", nb_deprecation)) return problems @@ -810,10 +804,9 @@ def _normalize_api(api: str) -> str: return api -def _audit_setting_value(key: str, platform_settings: dict[str, any], audit_settings: types.ConfigSettings, url: str) -> list[Problem]: +def _audit_setting_value(key: str, platform_settings: dict[str, Any], audit_settings: types.ConfigSettings, url: str) -> list[Problem]: """Audits a particular platform setting is set to expected value""" - v = _get_multiple_values(4, audit_settings[key], "MEDIUM", "CONFIGURATION") - if v is None: + if (v := _get_multiple_values(4, audit_settings[key], "MEDIUM", "CONFIGURATION")) is None: log.error(WRONG_CONFIG_MSG, key, audit_settings[key]) return [] if v[0] not in platform_settings: @@ -830,11 +823,10 @@ def _audit_setting_value(key: str, platform_settings: dict[str, any], audit_sett def _audit_setting_in_range( - key: str, platform_settings: dict[str, any], audit_settings: types.ConfigSettings, sq_version: tuple[int, int, int], url: str + key: str, platform_settings: dict[str, Any], audit_settings: types.ConfigSettings, sq_version: tuple[int, int, int], url: str ) -> list[Problem]: """Audits a particular platform setting is within expected range of values""" - v = _get_multiple_values(5, audit_settings[key], "MEDIUM", "CONFIGURATION") - if v is None: + if (v := _get_multiple_values(5, audit_settings[key], "MEDIUM", "CONFIGURATION")) is None: log.error(WRONG_CONFIG_MSG, key, audit_settings[key]) return [] if v[0] not in platform_settings: @@ -852,11 +844,10 @@ def _audit_setting_in_range( def _audit_setting_set( - key: str, check_is_set: bool, platform_settings: dict[str, any], audit_settings: types.ConfigSettings, url: str + key: str, check_is_set: bool, platform_settings: dict[str, Any], audit_settings: types.ConfigSettings, url: str ) -> list[Problem]: """Audits that a setting is set or not set""" - v = _get_multiple_values(3, audit_settings[key], "MEDIUM", "CONFIGURATION") - if v is None: + if (v := _get_multiple_values(3, audit_settings[key], "MEDIUM", "CONFIGURATION")) is None: log.error(WRONG_CONFIG_MSG, key, audit_settings[key]) return [] log.info("Auditing whether setting %s is set or not", v[0]) @@ -928,7 +919,6 @@ def import_config(endpoint: Platform, config_data: types.ObjectJsonRepr, key_lis :param Platform endpoint: reference to the SonarQube platform :param ObjectJsonRepr config_data: the configuration to import :param KeyList key_list: Unused - :return: Nothing """ return endpoint.import_config(config_data) @@ -948,7 +938,6 @@ def export(endpoint: Platform, export_settings: types.ConfigSettings, **kwargs) :param Platform endpoint: reference to the SonarQube platform :param ConfigSettings export_settings: Export parameters :return: Platform settings - :rtype: ObjectJsonRepr """ exp = endpoint.export(export_settings) if write_q := kwargs.get("write_q", None): @@ -974,3 +963,29 @@ def audit(endpoint: Platform, audit_settings: types.ConfigSettings, **kwargs) -> pbs = endpoint.audit(audit_settings) "write_q" in kwargs and kwargs["write_q"].put(pbs) return pbs + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts sonar-config "plaform" section old JSON report format to new format""" + if "plugins" in old_json: + old_json["plugins"] = util.dict_to_list(old_json["plugins"], "key") + return old_json + + +def global_settings_old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts sonar-config "globalSettings" section old JSON report format to new format""" + new_json = {} + special_categories = (settings.LANGUAGES_SETTINGS, settings.DEVOPS_INTEGRATION, "permissions", "permissionTemplates") + for categ in [cat for cat in settings.CATEGORIES if cat not in special_categories]: + new_json[categ] = util.sort_list_by_key(util.dict_to_list(old_json[categ], "key"), "key") + for k, v in old_json[settings.LANGUAGES_SETTINGS].items(): + new_json[settings.LANGUAGES_SETTINGS] = new_json.get(settings.LANGUAGES_SETTINGS, None) or {} + new_json[settings.LANGUAGES_SETTINGS][k] = util.sort_list_by_key(util.dict_to_list(v, "key"), "key") + new_json[settings.LANGUAGES_SETTINGS] = util.dict_to_list(new_json[settings.LANGUAGES_SETTINGS], "language", "settings") + new_json[settings.DEVOPS_INTEGRATION] = util.dict_to_list(old_json[settings.DEVOPS_INTEGRATION], "key") + new_json["permissions"] = util.perms_to_list(old_json["permissions"]) + for v in old_json["permissionTemplates"].values(): + if "permissions" in v: + v["permissions"] = util.perms_to_list(v["permissions"]) + new_json["permissionTemplates"] = util.dict_to_list(old_json["permissionTemplates"], "key") + return new_json diff --git a/sonar/portfolios.py b/sonar/portfolios.py index 10a1aa07..1b8f4a9b 100644 --- a/sonar/portfolios.py +++ b/sonar/portfolios.py @@ -850,3 +850,24 @@ def get_api_branch(branch: str) -> str: def clear_cache(endpoint: pf.Platform) -> None: """Clears the cache of an endpoint""" Portfolio.clear_cache(endpoint) + + +def old_to_new_json_one(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts the sonar-config old JSON report format for a single portfolio to the new one""" + new_json = old_json.copy() + for key in "children", "portfolios": + if key in new_json: + new_json[key] = old_to_new_json(new_json[key]) + if "permissions" in old_json: + new_json["permissions"] = util.perms_to_list(old_json["permissions"]) + if "branches" in old_json: + new_json["branches"] = util.dict_to_list(old_json["branches"], "name") + return new_json + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts the sonar-config portfolios old JSON report format to the new one""" + new_json = old_json.copy() + for k, v in new_json.items(): + new_json[k] = old_to_new_json_one(v) + return util.dict_to_list(new_json, "key") diff --git a/sonar/projects.py b/sonar/projects.py index 26bcce7e..907bd99f 100644 --- a/sonar/projects.py +++ b/sonar/projects.py @@ -32,7 +32,7 @@ from datetime import datetime import traceback -from typing import Optional, Union +from typing import Optional, Union, Any from http import HTTPStatus from threading import Lock from requests import HTTPError, RequestException @@ -1696,3 +1696,23 @@ def import_zips(endpoint: pf.Platform, project_list: list[str], threads: int = 2 log.info("%d/%d imports (%d%%) - Latest: %s - %s", i, nb_projects, int(i * 100 / nb_projects), proj_key, status) log.info("%s", ", ".join([f"{k}:{v}" for k, v in statuses_count.items()])) return statuses + + +def old_to_new_json_one(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts the sonar-config projects old JSON report format for a single project to the new one""" + new_json = old_json.copy() + if "permissions" in old_json: + new_json["permissions"] = util.perms_to_list(old_json["permissions"]) + if "branches" in old_json: + new_json["branches"] = util.dict_to_list(old_json["branches"], "name") + if "settings" in old_json: + new_json["settings"] = util.dict_to_list(old_json["settings"], "key") + return new_json + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts the sonar-config projects old JSON report format to the new one""" + new_json = old_json.copy() + for k, v in new_json.items(): + new_json[k] = old_to_new_json_one(v) + return util.dict_to_list(new_json, "key") diff --git a/sonar/qualitygates.py b/sonar/qualitygates.py index 1236d3de..89f0a921 100644 --- a/sonar/qualitygates.py +++ b/sonar/qualitygates.py @@ -24,7 +24,7 @@ """ from __future__ import annotations -from typing import Union, Optional +from typing import Union, Optional, Any import json @@ -561,3 +561,8 @@ def _decode_condition(cond: str) -> tuple[str, str, str]: def search_by_name(endpoint: pf.Platform, name: str) -> dict[str, QualityGate]: """Searches quality gates matching name""" return util.search_by_name(endpoint, name, QualityGate.API[c.LIST], "qualitygates") + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts the sonar-config quality gates old JSON report format to the new one""" + return util.dict_to_list(old_json, "name") diff --git a/sonar/qualityprofiles.py b/sonar/qualityprofiles.py index 6ab46c64..1ef59f60 100644 --- a/sonar/qualityprofiles.py +++ b/sonar/qualityprofiles.py @@ -789,21 +789,6 @@ def flatten(qp_list: types.ObjectJsonRepr) -> types.ObjectJsonRepr: return flat_list -def __convert_children_to_list(qp_json: dict[str, Any]) -> list[dict[str, Any]]: - """Converts a profile's children profiles to list""" - for v in qp_json.values(): - if "children" in v: - v["children"] = __convert_children_to_list(v["children"]) - return util.dict_to_list(qp_json, "name") - - -def __convert_profiles_to_list(qp_json: dict[str, Any]) -> list[dict[str, Any]]: - """Converts a language top level list of profiles to list""" - for k, v in qp_json.items(): - qp_json[k] = __convert_children_to_list(v) - return util.dict_to_list(qp_json, "language", "profiles") - - def export(endpoint: pf.Platform, export_settings: types.ConfigSettings, **kwargs) -> types.ObjectJsonRepr: """Exports all or a list of quality profiles configuration as dict @@ -917,3 +902,18 @@ def convert_one_qp_yaml(qp: types.ObjectJsonRepr) -> types.ObjectJsonRepr: qp[_CHILDREN_KEY] = {k: convert_one_qp_yaml(q) for k, q in qp[_CHILDREN_KEY].items()} qp[_CHILDREN_KEY] = util.dict_to_list(qp[_CHILDREN_KEY], "name") return qp + + +def __convert_children_to_list(qp_json: dict[str, Any]) -> list[dict[str, Any]]: + """Converts a profile's children profiles to list""" + for v in qp_json.values(): + if "children" in v: + v["children"] = __convert_children_to_list(v["children"]) + return util.dict_to_list(qp_json, "name") + + +def old_to_new_json(qp_json: dict[str, Any]) -> list[dict[str, Any]]: + """Converts a language top level list of profiles to list""" + for k, v in qp_json.items(): + qp_json[k] = __convert_children_to_list(v) + return util.dict_to_list(qp_json, "language", "profiles") diff --git a/sonar/rules.py b/sonar/rules.py index dcb4bcf1..f4e6e3c8 100644 --- a/sonar/rules.py +++ b/sonar/rules.py @@ -28,7 +28,7 @@ import json import concurrent.futures from threading import Lock -from typing import Optional +from typing import Optional, Any import sonar.logging as log import sonar.sqobject as sq @@ -486,6 +486,7 @@ def export(endpoint: platform.Platform, export_settings: types.ConfigSettings, * for k in ("instantiated", "extended", "standard", "thirdParty"): if len(rule_list.get(k, {})) == 0: rule_list.pop(k, None) + rule_list = old_to_new_json(rule_list) if write_q := kwargs.get("write_q", None): write_q.put(rule_list) write_q.put(utilities.WRITE_END) @@ -578,3 +579,12 @@ def severities(endpoint: platform.Platform, json_data: dict[str, any]) -> Option return {impact["softwareQuality"]: impact["severity"] for impact in json_data.get("impacts", [])} else: return json_data.get("severity", None) + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts the sonar-config rules old JSON report format to the new one""" + new_json = {} + for k in ("instantiated", "extended", "standard", "thirdParty"): + if k in old_json: + new_json[k] = utilities.dict_to_list(old_json[k], "key") + return new_json diff --git a/sonar/users.py b/sonar/users.py index 3eeabd1f..cdd3bede 100644 --- a/sonar/users.py +++ b/sonar/users.py @@ -23,7 +23,7 @@ from __future__ import annotations import concurrent.futures -from typing import Optional, Union +from typing import Optional, Union, Any from datetime import datetime, timezone import json @@ -568,3 +568,8 @@ def exists(endpoint: pf.Platform, login: str) -> bool: :return: whether the group exists """ return User.get_object(endpoint=endpoint, login=login) is not None + + +def old_to_new_json(old_json: dict[str, Any]) -> dict[str, Any]: + """Converts the sonar-config users old JSON report format to the new one""" + return util.dict_to_list(old_json, "login")