Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions cli/findings_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,27 @@

import os
import csv
from typing import TextIO
from typing import TextIO, Union, TYPE_CHECKING
import concurrent.futures
from argparse import Namespace
import traceback

from cli import options
from sonar.util.types import ConfigSettings
import sonar.logging as log
from sonar import platform, exceptions, errcodes, version
from sonar.platform import Platform
from sonar.projects import Project
from sonar.applications import Application
from sonar.portfolios import Portfolio

from sonar import exceptions, errcodes, version
from sonar import hotspots, findings
from sonar import applications, portfolios

from sonar.util import issue_defs as idefs, types, component_helper
import sonar.util.constants as c

import sonar.utilities as util


TOOL_NAME = "sonar-findings"
DATES_WITHOUT_TIME = False

Expand Down Expand Up @@ -149,7 +154,7 @@ def parse_args(desc: str) -> Namespace:
return options.parse_and_check(parser=parser, logger_name=TOOL_NAME)


def __write_header(file: str, endpoint: platform.Platform, **kwargs) -> None:
def __write_header(file: str, endpoint: Platform, **kwargs) -> None:
"""Writes the file header"""
with util.open_file(file=file) as fd:
if kwargs[options.FORMAT] == "sarif":
Expand Down Expand Up @@ -264,15 +269,17 @@ def needs_hotspot_search(params: types.ApiParams) -> bool:
)


def get_component_findings(component: object, search_findings: bool, params: ConfigSettings) -> dict[str, findings.Finding]:
def get_component_findings(
component: Union[Project, Application, Portfolio], search_findings: bool, params: ConfigSettings
) -> dict[str, findings.Finding]:
"""Gets the findings of a component and puts them in a writing queue"""
try:
_ = next(v for k, v in params.items() if k in _SEARCH_CRITERIA and v is not None)
search_findings = False
except StopIteration:
pass

if search_findings and not isinstance(component, (applications.Application, portfolios.Portfolio)):
if search_findings and not isinstance(component, (Application, Portfolio)):
return findings.export_findings(
component.endpoint, component.key, branch=params.get("branch", None), pull_request=params.get("pullRequest", None)
)
Expand All @@ -288,7 +295,7 @@ def get_component_findings(component: object, search_findings: bool, params: Con
return findings_list


def store_findings(components_list: list[object], endpoint: platform.Platform, params: ConfigSettings) -> int:
def store_findings(components_list: list[Union[Project, Application, Portfolio]], endpoint: Platform, params: ConfigSettings) -> int:
"""Export all findings of a given project list

:param list[Components] components_list: Components to export findings (components can be projects, branches, PRs, applications, or portfolios)
Expand Down Expand Up @@ -324,7 +331,7 @@ def store_findings(components_list: list[object], endpoint: platform.Platform, p
return total_findings


def __turn_off_use_findings_if_needed(endpoint: object, params: dict[str, str]) -> dict[str, str]:
def __turn_off_use_findings_if_needed(endpoint: Platform, params: dict[str, str]) -> dict[str, str]:
"""Turn off use-findings option if some incompatible options (issue filters) are used"""
if not params[options.USE_FINDINGS]:
return params
Expand All @@ -347,7 +354,7 @@ def main() -> None:
start_time = util.start_clock()
try:
kwargs = util.convert_args(parse_args("Sonar findings export"))
sqenv = platform.Platform(**kwargs)
sqenv = Platform(**kwargs)
sqenv.verify_connection()
sqenv.set_user_agent(f"{TOOL_NAME} {version.PACKAGE_VERSION}")

Expand Down
9 changes: 5 additions & 4 deletions cli/housekeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from requests import RequestException
from cli import options
import sonar.logging as log
from sonar import platform, tokens, users, projects, branches, version, errcodes
from sonar.platform import Platform
from sonar import tokens, users, projects, branches, version, errcodes
import sonar.util.constants as c
import sonar.utilities as util
import sonar.exceptions as ex
Expand All @@ -42,7 +43,7 @@
PROJ_MAX_AGE = "audit.projects.maxLastAnalysisAge"


def get_project_problems(settings: dict[str, str], endpoint: object) -> list[problem.Problem]:
def get_project_problems(settings: dict[str, str], endpoint: Platform) -> list[problem.Problem]:
"""Returns the list of problems that would require housekeeping for a given project"""
problems = []
if settings[PROJ_MAX_AGE] < 90:
Expand Down Expand Up @@ -71,7 +72,7 @@ def get_project_problems(settings: dict[str, str], endpoint: object) -> list[pro
return problems


def get_user_problems(settings: dict[str, str], endpoint: platform.Platform) -> list[problem.Problem]:
def get_user_problems(settings: dict[str, str], endpoint: Platform) -> list[problem.Problem]:
"""Collects problems related to user accounts"""
user_problems = users.audit(endpoint=endpoint, audit_settings=settings)
loglevel = log.WARNING if len(user_problems) > 0 else log.INFO
Expand Down Expand Up @@ -191,7 +192,7 @@ def main() -> None:
start_time = util.start_clock()
try:
kwargs = util.convert_args(_parse_arguments())
sq = platform.Platform(**kwargs)
sq = Platform(**kwargs)
sq.verify_connection()
sq.set_user_agent(f"{TOOL_NAME} {version.PACKAGE_VERSION}")

Expand Down
6 changes: 3 additions & 3 deletions cli/loc.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __dump_csv(object_list: list[object], file: str, **kwargs) -> None:
if len(object_list) <= 0:
log.warning("No objects with LoCs to dump, dump skipped")
return
obj_type = type(object_list[0]).__name__.lower()
obj_type = util.class_name(object_list[0]).lower()
nb_loc, nb_objects = 0, 0
with util.open_file(file) as fd:
writer = csv.writer(fd, delimiter=kwargs[options.CSV_SEPARATOR])
Expand All @@ -95,7 +95,7 @@ def __dump_csv(object_list: list[object], file: str, **kwargs) -> None:
def __get_object_json_data(o: object, **kwargs) -> dict[str, str]:
"""Returns the object data as JSON"""
parent_type = kwargs[options.COMPONENT_TYPE][:-1]
is_branch = type(o).__name__.lower() in ("branch", "applicationbranch")
is_branch = util.class_name(o).lower() in ("branch", "applicationbranch")
parent_o = o.concerned_object if is_branch else o
d = {parent_type: parent_o.key, "ncloc": ""}
try:
Expand Down Expand Up @@ -124,7 +124,7 @@ def __dump_json(object_list: list[object], file: str, **kwargs) -> None:
if len(object_list) <= 0:
log.warning("No objects with LoCs to dump, dump skipped")
return
obj_type = type(object_list[0]).__name__.lower()
obj_type = util.class_name(object_list[0]).lower()
# Collect all objects data
for o in object_list:
data.append(__get_object_json_data(o, **kwargs))
Expand Down
17 changes: 11 additions & 6 deletions cli/measures_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@
import sys
import csv
from requests import RequestException
from typing import TYPE_CHECKING

from sonar.util import types
from cli import options
import sonar.logging as log
from sonar import metrics, platform, exceptions, errcodes, version, measures
from sonar.platform import Platform
from sonar import metrics, exceptions, errcodes, version, measures
import sonar.utilities as util
import sonar.util.constants as c
from sonar.util import component_helper

TOOL_NAME = "sonar-measures"

if TYPE_CHECKING:
from sonar.components import Component

def __get_measures_history(obj: object, wanted_metrics: types.KeyList, convert_options: dict[str, str]) -> dict[str, str]:

def __get_measures_history(obj: Component, wanted_metrics: types.KeyList, convert_options: dict[str, str]) -> dict[str, str]:
"""Returns the measure history of an object (project, branch, application, portfolio)"""
try:
data = obj.get_measures_history(wanted_metrics)
Expand All @@ -55,7 +60,7 @@ def __get_measures_history(obj: object, wanted_metrics: types.KeyList, convert_o
return obj.component_data() | {"history": data}


def __get_measures(obj: object, wanted_metrics: types.KeyList, convert_options: dict[str, str]) -> dict[str, str]:
def __get_measures(obj: Component, wanted_metrics: types.KeyList, convert_options: dict[str, str]) -> dict[str, str]:
"""Returns the list of requested measures of an object"""
log.info("Getting measures for %s", str(obj))
measures_d = {}
Expand All @@ -71,7 +76,7 @@ def __get_measures(obj: object, wanted_metrics: types.KeyList, convert_options:
return measures_d

sep = "|" if convert_options[options.CSV_SEPARATOR] == "," else ","
if obj.__class__.__name__ == "Branch":
if util.class_name(obj) == "Branch":
measures_d["tags"] = sep.join(obj.concerned_object.get_tags())
else:
measures_d["tags"] = sep.join(obj.get_tags())
Expand All @@ -81,7 +86,7 @@ def __get_measures(obj: object, wanted_metrics: types.KeyList, convert_options:
return measures_d


def __get_wanted_metrics(endpoint: platform.Platform, wanted_metrics: types.KeySet) -> types.KeyList:
def __get_wanted_metrics(endpoint: Platform, wanted_metrics: types.KeySet) -> types.KeyList:
"""Returns an ordered list of metrics based on CLI inputs"""
main_metrics = list(metrics.MAIN_METRICS)
if endpoint.version() >= c.ACCEPT_INTRO_VERSION:
Expand Down Expand Up @@ -255,7 +260,7 @@ def main() -> None:
kwargs["dates"] = "dateonly" if kwargs["datesWithoutTime"] else "datetime"
if kwargs[options.COMPONENT_TYPE] == "portfolios" and kwargs[options.WITH_TAGS]:
util.final_exit(errcodes.ARGS_ERROR, f"Portfolios have no tags, can't use option --{options.WITH_TAGS} with --{options.PORTFOLIOS}")
endpoint = platform.Platform(**kwargs)
endpoint = Platform(**kwargs)
endpoint.verify_connection()
endpoint.set_user_agent(f"{TOOL_NAME} {version.PACKAGE_VERSION}")

Expand Down
7 changes: 5 additions & 2 deletions sonar/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

"""

from typing import Optional
from typing import Optional, TYPE_CHECKING
import json

import sonar.logging as log
Expand All @@ -35,6 +35,9 @@
from sonar.audit.rules import get_rule
from sonar.audit.problem import Problem

if TYPE_CHECKING:
from sonar.permissions import Permissions


class Aggregation(comp.Component):
"""Parent class of applications and portfolios"""
Expand Down Expand Up @@ -84,7 +87,7 @@ def _audit_empty_aggregation(self, broken_rule: object) -> list[Problem]:
def _audit_singleton_aggregation(self, broken_rule: object) -> list[Problem]:
return self._audit_aggregation_cardinality((1,), broken_rule)

def permissions(self) -> Optional[object]:
def permissions(self) -> Optional[Permissions]:
"""Should be implement in child classes"""
return self._permissions

Expand Down
46 changes: 24 additions & 22 deletions sonar/app_branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"""Abstraction of Sonar Application Branch"""

from __future__ import annotations
from typing import Optional
from typing import Optional, TYPE_CHECKING

import json
from http import HTTPStatus
Expand All @@ -37,6 +37,8 @@
from sonar import exceptions, projects, utilities
import sonar.util.constants as c

if TYPE_CHECKING:
from sonar.applications import Application

_NOT_SUPPORTED = "Applications not supported in community edition"

Expand All @@ -46,30 +48,30 @@ class ApplicationBranch(Component):
Abstraction of the SonarQube "application branch" concept
"""

CACHE = cache.Cache()
API = {
c.CREATE: "applications/create_branch",
c.GET: "applications/show",
c.UPDATE: "applications/update_branch",
c.DELETE: "applications/delete_branch",
CACHE = cache.Cache() # :type: cache.Cache
API = { # :type: dict[str, str]
c.CREATE: "applications/create_branch", # :type: str
c.GET: "applications/show", # :type: str
c.UPDATE: "applications/update_branch", # :type: str
c.DELETE: "applications/delete_branch", # :type: str
}

def __init__(
self, app: object, name: str, project_branches: list[Branch], is_main: bool = False, branch_data: Optional[types.ApiPayload] = None
self, app: Application, name: str, project_branches: list[Branch], is_main: bool = False, branch_data: Optional[types.ApiPayload] = None
) -> None:
"""Don't use this directly, go through the class methods to create Objects"""
super().__init__(endpoint=app.endpoint, key=f"{app.key} BRANCH {name}")
self.concerned_object = app
self.name = name
self.sq_json = branch_data
self._is_main = is_main
self._project_branches = project_branches
self._last_analysis = None
log.debug("Created object %s with uuid %d id %x", str(self), hash(self), id(self))
self.concerned_object = app # :type: Application
self.name = name # :type: str
self.sq_json = branch_data # :type: types.ApiPayload
self._is_main = is_main # :type: bool
self._project_branches = project_branches # :type: list[Branch]
self._last_analysis = None # :type: Optional[datetime]
log.debug("Created %s with uuid %d id %x", str(self), hash(self), id(self))
ApplicationBranch.CACHE.put(self)

@classmethod
def get_object(cls, app: object, branch_name: str) -> ApplicationBranch:
def get_object(cls, app: Application, branch_name: str) -> ApplicationBranch:
"""Gets an Application object from SonarQube

:param Application app: Reference to the Application holding that branch
Expand All @@ -92,7 +94,7 @@ def get_object(cls, app: object, branch_name: str) -> ApplicationBranch:
raise exceptions.ObjectNotFound(app.key, f"Application key '{app.key}' branch '{branch_name}' not found")

@classmethod
def create(cls, app: object, name: str, project_branches: list[Branch]) -> ApplicationBranch:
def create(cls, app: Application, name: str, project_branches: list[Branch]) -> ApplicationBranch:
"""Creates an ApplicationBranch object in SonarQube

:param Application app: Reference to the Application holding that branch
Expand Down Expand Up @@ -120,7 +122,7 @@ def create(cls, app: object, name: str, project_branches: list[Branch]) -> Appli
return ApplicationBranch(app=app, name=name, project_branches=project_branches)

@classmethod
def load(cls, app: object, branch_data: types.ApiPayload) -> ApplicationBranch:
def load(cls, app: Application, branch_data: types.ApiPayload) -> ApplicationBranch:
project_branches = []
for proj_data in branch_data["projects"]:
proj = projects.Project.get_object(app.endpoint, proj_data["key"])
Expand Down Expand Up @@ -238,7 +240,7 @@ def component_data(self) -> types.Obj:
return {
"key": self.concerned_object.key,
"name": self.concerned_object.name,
"type": type(self.concerned_object).__name__.upper(),
"type": utilities.class_name(self.concerned_object).upper(),
"branch": self.name,
"url": self.url(),
}
Expand All @@ -248,7 +250,7 @@ def url(self) -> str:
return f"{self.base_url(local=False)}/dashboard?id={self.concerned_object.key}&branch={quote(self.name)}"


def exists(app: object, branch: str) -> bool:
def exists(app: Application, branch: str) -> bool:
"""Returns whether an application branch exists"""
try:
ApplicationBranch.get_object(app, branch)
Expand All @@ -257,8 +259,8 @@ def exists(app: object, branch: str) -> bool:
return False


def list_from(app: object, data: types.ApiPayload) -> dict[str, ApplicationBranch]:
"""Returns a dict of application branches form the pure App JSON"""
def list_from(app: Application, data: types.ApiPayload) -> dict[str, ApplicationBranch]:
"""Returns a dict of application branches from the pure App JSON"""
if not data or "branches" not in data:
return {}
branch_list = {}
Expand Down
Loading
Loading