Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions cli/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import json
import csv
import re
from typing import TextIO, Optional
from threading import Thread
from queue import Queue
Expand All @@ -50,6 +51,8 @@
options.WHAT_PORTFOLIOS: portfolios.audit,
}

PROBLEM_KEYS = "problems"


def _audit_sif(sysinfo: str, audit_settings: types.ConfigSettings) -> tuple[str, list[problem.Problem]]:
"""Audits a SIF and return found problems"""
Expand All @@ -70,20 +73,35 @@ def _audit_sif(sysinfo: str, audit_settings: types.ConfigSettings) -> tuple[str,
return sif_obj.server_id(), sif_obj.audit(audit_settings)


def __filter_problems(problems: list[problem.Problem], settings: types.ConfigSettings) -> list[problem.Problem]:
"""Filters audit problems by severity and/or type and/or problem key"""
if settings.get(options.SEVERITIES, None):
log.debug("Filtering audit problems with severities: %s", settings[options.SEVERITIES])
problems = [p for p in problems if str(p.severity) in settings[options.SEVERITIES]]
if settings.get(options.TYPES, None):
log.debug("Filtering audit problems with types: %s", settings[options.TYPES])
problems = [p for p in problems if str(p.type) in settings[options.TYPES]]
if settings.get(PROBLEM_KEYS, None):
log.debug("Filtering audit problems with keys: %s", settings[PROBLEM_KEYS])
problems = [p for p in problems if re.match(rf"^{settings[PROBLEM_KEYS]}$", str(p.rule_id))]
return problems


def write_csv(queue: Queue[list[problem.Problem]], fd: TextIO, settings: types.ConfigSettings) -> None:
"""Thread callback to write audit problems in a CSV file"""
server_id = settings.get("SERVER_ID", None)
with_url = settings.get("WITH_URL", False)
csvwriter = csv.writer(fd, delimiter=settings.get("CSV_DELIMITER", ","))
header = ["Server Id"] if server_id else []
header += ["Audit Check", "Category", "Severity", "Message"]
header += ["Problem", "Type", "Severity", "Message"]
header += ["URL"] if with_url else []
csvwriter.writerow(header)
while (problems := queue.get()) is not util.WRITE_END:
problems = __filter_problems(problems, settings)
for p in problems:
json_data = p.to_json(with_url)
data = [] if not server_id else [server_id]
data += list(json_data.values())
data += [json_data[k] for k in ("problem", "type", "severity", "message") if k in json_data]
csvwriter.writerow(data)
queue.task_done()
queue.task_done()
Expand All @@ -96,6 +114,7 @@ def write_json(queue: Queue[list[problem.Problem]], fd: TextIO, settings: types.
comma = ""
print("[", file=fd)
while (problems := queue.get()) is not util.WRITE_END:
problems = __filter_problems(problems, settings)
for p in problems:
json_data = p.to_json(with_url)
if server_id:
Expand Down Expand Up @@ -166,6 +185,24 @@ def __parser_args(desc: str) -> object:
nargs="*",
help="Pass audit configuration settings on command line (-D<setting>=<value>)",
)
parser.add_argument(
f"--{options.SEVERITIES}",
required=False,
default=None,
help="Report only audit problems with the given severities (comma separate values LOW, MEDIUM, HIGH, CRITICAL)",
)
parser.add_argument(
f"--{options.TYPES}",
required=False,
default=None,
help="Report only audit problems of the given comma separated problem types",
)
parser.add_argument(
f"--{PROBLEM_KEYS}",
required=False,
default=None,
help="Report only audit problems whose type key matches the given regexp",
)
args = options.parse_and_check(parser=parser, logger_name=TOOL_NAME, verify_token=False)
if args.sif is None and args.config is None:
util.check_token(args.token)
Expand All @@ -189,14 +226,14 @@ def main() -> None:
key, value = val[0].split("=", maxsplit=1)
cli_settings[key] = value
settings = audit_conf.load(TOOL_NAME, cli_settings)
settings |= kwargs
file = ofile = kwargs.pop(options.REPORT_FILE)
fmt = util.deduct_format(kwargs[options.FORMAT], ofile)
settings.update(
{
"FILE": file,
"CSV_DELIMITER": kwargs[options.CSV_SEPARATOR],
"WITH_URL": kwargs[options.WITH_URL],
"threads": kwargs[options.NBR_THREADS],
"format": fmt,
}
)
Expand All @@ -208,8 +245,8 @@ def main() -> None:
file = kwargs["sif"]
errcode = errcodes.SIF_AUDIT_ERROR
(settings["SERVER_ID"], problems) = _audit_sif(file, settings)
problems = __filter_problems(problems, settings)
problem.dump_report(problems, file=ofile, server_id=settings["SERVER_ID"], format=fmt)

else:
sq = platform.Platform(**kwargs)
sq.verify_connection()
Expand Down
2 changes: 1 addition & 1 deletion cli/findings_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def parse_args(desc: str) -> Namespace:
parser.add_argument(
f"--{options.SEVERITIES}",
required=False,
help="Comma separated severities among" + util.list_to_csv(idefs.STD_SEVERITIES + hotspots.SEVERITIES),
help="Comma separated severities among " + util.list_to_csv(idefs.STD_SEVERITIES + hotspots.SEVERITIES),
)
parser.add_argument(
f"--{options.TYPES}",
Expand Down
3 changes: 3 additions & 0 deletions doc/sonar-audit.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ When `--what` is not specified, everything is audited
- `--what apps`: Audits applications
- `-f <file>`: Sends audit output to `<file>`, `stdout` is the default. The output format is deducted from
the file extension (JSON or CSV), except if `--format` is specified
- `--severities`: The audit output will only reports problems of the given severities to pass as comma separated (LOW, MEDIUM, HIGH, CRITICAL)
- `--types`: The audit output will only reports problems of the given types to pass as comma separated (BAD_PRACTICE, GOVERNANCE, HOUSEKEEPING, OPERATIONS, PERFORMANCE, SECURITY as of today, more may be added in the future)
- `--problems`: The audit output will only report problems whose key match the given regexp. The key is the 2nd column of the CSV or the "problem" field of the JSON
- `--sif <SystemInfoFile>`: Will audit the input SIF file, instead of connecting to a SonarQube Server or Cloud platform.
In that case:
- URL and token are not needed
Expand Down
3 changes: 2 additions & 1 deletion sonar/audit/problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def to_json(self, with_url=False):
d = vars(self).copy()
d.pop("concerned_object")

for k in ("severity", "type", "rule_id"):
d["problem"] = str(d.pop("rule_id"))
for k in ("severity", "type"):
d[k] = str(d[k])
if with_url:
try:
Expand Down
2 changes: 1 addition & 1 deletion sonar/portfolios.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def __str__(self) -> str:
"""Returns string representation of object"""
return (
f"subportfolio '{self.key}'"
if self.sq_json.get("qualifier", _PORTFOLIO_QUALIFIER) == _SUBPORTFOLIO_QUALIFIER
if self.sq_json and self.sq_json.get("qualifier", _PORTFOLIO_QUALIFIER) == _SUBPORTFOLIO_QUALIFIER
else f"portfolio '{self.key}'"
)

Expand Down
16 changes: 10 additions & 6 deletions test/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,40 +239,44 @@ def csv_file() -> Generator[str]:
"""setup of tests"""
file = get_temp_filename("csv")
yield file
rm(file)
if os.path.exists(file):
rm(file)


@pytest.fixture
def txt_file() -> Generator[str]:
"""setup of tests"""
file = get_temp_filename("txt")
yield file
rm(file)
if os.path.exists(file):
rm(file)


@pytest.fixture
def json_file() -> Generator[str]:
"""setup of tests"""
file = get_temp_filename("json")
yield file
rm(file)
if os.path.exists(file):
rm(file)


@pytest.fixture
def yaml_file() -> Generator[str]:
"""setup of tests"""
file = get_temp_filename("yaml")
rm(file)
yield file
rm(file)
if os.path.exists(file):
rm(file)


@pytest.fixture
def sarif_file() -> Generator[str]:
"""setup of tests"""
file = get_temp_filename("sarif")
yield file
rm(file)
if os.path.exists(file):
rm(file)


@pytest.fixture
Expand Down
65 changes: 2 additions & 63 deletions test/unit/test_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,75 +19,14 @@
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#

"""sonar-audit tests"""

import os
from collections.abc import Generator
"""Project audit tests"""

import utilities as tutil
from sonar import errcodes as e
import cli.options as opt
from cli import audit
from sonar import projects
from sonar.audit import rules

CMD = f"sonar-audit.py {tutil.SQS_OPTS}"

AUDIT_DISABLED = """
audit.globalSettings = no
audit.projects = false
audit.qualityGates = no
audit.qualityProfiles = no
audit.users = no
audit.groups = no
audit.portfolios = no
audit.applications = no
audit.logs = no
audit.plugins = no"""


def test_audit_disabled(csv_file: Generator[str]) -> None:
"""test_audit_disabled"""
with open(".sonar-audit.properties", mode="w", encoding="utf-8") as fd:
print(AUDIT_DISABLED, file=fd)
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {csv_file}") == e.OK
os.remove(".sonar-audit.properties")
assert tutil.csv_nbr_lines(csv_file) == 0


def test_audit_stdout() -> None:
"""test_audit_stdout"""
assert tutil.run_cmd(audit.main, CMD) == e.OK


def test_audit_json(json_file: Generator[str]) -> None:
"""test_audit_json"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {json_file}") == e.OK


def test_audit_proj_key(csv_file: Generator[str]) -> None:
"""test_audit_proj_key"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {csv_file} --{opt.WHAT} projects --{opt.KEY_REGEXP} {tutil.LIVE_PROJECT}") == e.OK


def test_audit_proj_non_existing_key() -> None:
"""test_audit_proj_non_existing_key"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.WHAT} projects --{opt.KEY_REGEXP} {tutil.LIVE_PROJECT},bad_key") == e.ARGS_ERROR


def test_audit_cmd_line_settings(csv_file: Generator[str]) -> None:
"""test_audit_cmd_line_settings"""
what_to_audit = ["logs", "projects", "portfolios", "applications", "qualityProfiles", "qualityGates", "users", "groups"]
cli_opt = " ".join([f"-Daudit.{what}=true" for what in what_to_audit])
assert tutil.run_cmd(audit.main, f"{CMD} {cli_opt} --{opt.REPORT_FILE} {csv_file}") == e.OK
assert tutil.csv_nbr_lines(csv_file) > 0

cli_opt = " ".join([f"-Daudit.{what}=false" for what in what_to_audit + ["globalSettings"]])
assert tutil.run_cmd(audit.main, f"{CMD} {cli_opt} --{opt.REPORT_FILE} {csv_file}") == e.OK
assert tutil.csv_nbr_lines(csv_file) == 0


def test_audit_proj_key_pattern(csv_file: Generator[str]) -> None:
def test_audit_proj_key_pattern() -> None:
"""test_audit_cmd_line_settings"""
settings = {"audit.projects": True, "audit.projects.keyPattern": None}
pbs = projects.audit(tutil.SQ, settings, key_list="BANKING.*")
Expand Down
117 changes: 117 additions & 0 deletions test/unit/test_cli_audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
#
# sonar-tools tests
# Copyright (C) 2024-2025 Olivier Korach
# mailto:olivier.korach AT gmail DOT com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#

"""sonar-audit CLI tests"""

import os
from collections.abc import Generator

import utilities as tutil
from sonar import errcodes as e
import cli.options as opt
from cli import audit

CMD = f"sonar-audit.py {tutil.SQS_OPTS}"

AUDIT_DISABLED = """
audit.globalSettings = no
audit.projects = false
audit.qualityGates = no
audit.qualityProfiles = no
audit.users = no
audit.groups = no
audit.portfolios = no
audit.applications = no
audit.logs = no
audit.plugins = no"""


def test_audit_disabled(csv_file: Generator[str]) -> None:
"""Tests that nothing is output when all audits are disabled"""
with open(".sonar-audit.properties", mode="w", encoding="utf-8") as fd:
print(AUDIT_DISABLED, file=fd)
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {csv_file}") == e.OK
os.remove(".sonar-audit.properties")
assert tutil.csv_nbr_lines(csv_file) == 0


def test_audit_stdout() -> None:
"""Tests audit to stdout"""
assert tutil.run_cmd(audit.main, CMD) == e.OK


def test_audit_json(json_file: Generator[str]) -> None:
"""Test audit to json file"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {json_file}") == e.OK


def test_audit_proj_key(csv_file: Generator[str]) -> None:
"""Tests that audit can select only specific project keys"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {csv_file} --{opt.WHAT} projects --{opt.KEY_REGEXP} {tutil.LIVE_PROJECT}") == e.OK


def test_audit_proj_non_existing_key() -> None:
"""Tests that error is raised when the project key regexp does not select any project"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.WHAT} projects --{opt.KEY_REGEXP} {tutil.LIVE_PROJECT},bad_key") == e.ARGS_ERROR


def test_audit_cmd_line_settings(csv_file: Generator[str]) -> None:
"""Verifies that passing audit settings from command line with -D<key>=<value> works"""
what_to_audit = ["logs", "projects", "portfolios", "applications", "qualityProfiles", "qualityGates", "users", "groups"]
cli_opt = " ".join([f"-Daudit.{what}=true" for what in what_to_audit])
assert tutil.run_cmd(audit.main, f"{CMD} {cli_opt} --{opt.REPORT_FILE} {csv_file}") == e.OK
assert tutil.csv_nbr_lines(csv_file) > 0

cli_opt = " ".join([f"-Daudit.{what}=false" for what in what_to_audit + ["globalSettings"]])
assert tutil.run_cmd(audit.main, f"{CMD} {cli_opt} --{opt.REPORT_FILE} {csv_file}") == e.OK
assert tutil.csv_nbr_lines(csv_file) == 0


def test_filter_severity(csv_file: Generator[str]) -> None:
"""Verify that filtering by severities works"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {csv_file} --{opt.SEVERITIES} MEDIUM,HIGH") == e.OK
assert tutil.csv_nbr_lines(csv_file) > 0
assert tutil.csv_col_is_value(csv_file, "Severity", "MEDIUM", "HIGH")


def test_filter_type(json_file: Generator[str]) -> None:
"""Verify that filtering by severities works"""
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {json_file} --{opt.TYPES} HOUSEKEEPING,SECURITY") == e.OK
assert tutil.json_field_in_values(json_file, "type", "HOUSEKEEPING", "SECURITY")


def test_filter_problem(csv_file: Generator[str]) -> None:
"""Verify that filtering by problem id works"""
regexp = "(OBJECT.+|QG.+)"
assert tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {csv_file} --problems {regexp}") == e.OK
assert tutil.csv_col_match(csv_file, "Problem", regexp)


def test_filter_multiple(csv_file: Generator[str]) -> None:
"""Verify that filtering by problem id works"""
regexp = "(OBJECT.+|QG.+)"
assert (
tutil.run_cmd(audit.main, f"{CMD} --{opt.REPORT_FILE} {csv_file} --{opt.TYPES} HOUSEKEEPING --{opt.SEVERITIES} MEDIUM --problems {regexp}")
== e.OK
)
assert tutil.csv_col_is_value(csv_file, "Severity", "MEDIUM")
assert tutil.csv_col_is_value(csv_file, "Type", "HOUSEKEEPING")
assert tutil.csv_col_match(csv_file, "Problem", regexp)
Loading