Skip to content
Merged
28 changes: 14 additions & 14 deletions sonar/branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ def export(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr:
:return: The branch new code period definition
:rtype: str
"""
from sonar import issues
from sonar.issues import count as issue_count
from sonar.hotspots import count as hotspot_count

log.debug("Exporting %s", str(self))
data = {settings.NEW_CODE_PERIOD: self.new_code()}
Expand All @@ -240,20 +241,19 @@ def export(self, export_settings: types.ConfigSettings) -> types.ObjectJsonRepr:
loc_distrib = {m.split("=")[0]: int(m.split("=")[1]) for m in lang_distrib.split(";")}
loc_distrib["total"] = self.loc()
data["ncloc"] = loc_distrib
if export_settings.get("MODE", "") == "MIGRATION":
tpissues = self.count_third_party_issues()
issue_data = {"thirdParty": tpissues if len(tpissues) > 0 else 0}
if self.endpoint.version() >= (10, 0, 0):
issue_data["falsePositives"] = issues.count(
self.endpoint, components=self.concerned_object.key, branch=self.name, issueStatuses="FALSE_POSITIVE"
)
issue_data["accepted"] = issues.count(self.endpoint, components=self.concerned_object.key, branch=self.name, issueStatuses="ACCEPTED")
else:
issue_data["falsePositives"] = issues.count(
self.endpoint, componentKeys=self.concerned_object.key, branch=self.name, resolutions="FALSE-POSITIVE"
)
issue_data["wontFix"] = issues.count(self.endpoint, componentKeys=self.concerned_object.key, branch=self.name, resolutions="WONTFIX")
data["issues"] = issue_data
params = self.search_params()
data["issues"] = {
"thirdParty": tpissues if len(tpissues) > 0 else 0,
"falsePositives": issue_count(self.endpoint, issueStatuses=["FALSE_POSITIVE"], **params),
}
status = "accepted" if self.endpoint.version() >= (10, 2, 0) else "wontFix"
data["issues"][status] = issue_count(self.endpoint, issueStatuses=[status.upper()], **params)
data["hotspots"] = {
"acknowledged": hotspot_count(self.endpoint, resolution=["ACKNOWLEDGED"], **params),
"safe": hotspot_count(self.endpoint, resolution=["SAFE"], **params),
"fixed": hotspot_count(self.endpoint, resolution=["FIXED"], **params),
}
log.debug("%s has these notable issues %s", str(self), str(data["issues"]))
data = util.remove_nones(data)
return None if len(data) == 0 else data
Expand Down
26 changes: 19 additions & 7 deletions sonar/hotspots.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,10 @@ def search(endpoint: pf.Platform, filters: types.ApiParams = None) -> dict[str,
new_params = get_search_filters(endpoint=endpoint, params=filters)
new_params = util.dict_remap(original_dict=new_params, remapping=_FILTERS_HOTSPOTS_REMAPPING)
filters_iterations = split_search_filters(new_params)
ps = 500 if "ps" not in new_params else new_params["ps"]
for inline_filters in filters_iterations:
p = 1
inline_filters["ps"] = 500
inline_filters["ps"] = ps
log.info("Searching hotspots with sanitized filters %s", str(inline_filters))
while True:
inline_filters["p"] = p
Expand All @@ -403,7 +404,7 @@ def search(endpoint: pf.Platform, filters: types.ApiParams = None) -> dict[str,
nbr_hotspots = 0
return {}
raise e
nbr_pages = (nbr_hotspots + 499) // 500
nbr_pages = (nbr_hotspots + ps - 1) // ps
log.debug("Number of hotspots: %d - Page: %d/%d", nbr_hotspots, inline_filters["p"], nbr_pages)
if nbr_hotspots > 10000:
raise TooManyHotspotsError(
Expand Down Expand Up @@ -433,6 +434,7 @@ def get_object(endpoint: pf.Platform, key: str, data: dict[str] = None, from_exp

def get_search_filters(endpoint: pf.Platform, params: types.ApiParams) -> types.ApiParams:
"""Returns the filtered list of params that are allowed for api/hotspots/search"""
log.debug("Sanitizing hotspot search criteria %s", str(params))
if params is None:
return {}
criterias = util.remove_nones(params.copy())
Expand All @@ -441,11 +443,12 @@ def get_search_filters(endpoint: pf.Platform, params: types.ApiParams) -> types.
criterias["status"] = util.allowed_values_string(criterias["status"], STATUSES)
if "resolution" in criterias:
criterias["resolution"] = util.allowed_values_string(criterias["resolution"], RESOLUTIONS)
log.warning("hotspot 'status' criteria incompatible with 'resolution' criteria, ignoring 'status'")
criterias["status"] = "REVIEWED"
if endpoint.version() >= (10, 2, 0):
criterias = util.dict_remap(original_dict=criterias, remapping={PROJECT_FILTER_OLD: PROJECT_FILTER})
return util.dict_subset(criterias, SEARCH_CRITERIAS)
if endpoint.version() <= (10, 2, 0):
criterias = util.dict_remap(original_dict=criterias, remapping={PROJECT_FILTER: PROJECT_FILTER_OLD})
criterias = util.dict_subset(criterias, SEARCH_CRITERIAS)
log.debug("Sanitized hotspot search criteria %s", str(criterias))
return criterias


def split_filter(params: types.ApiParams, criteria: str) -> list[types.ApiParams]:
Expand Down Expand Up @@ -475,7 +478,7 @@ def split_search_filters(params: types.ApiParams) -> list[types.ApiParams]:
def post_search_filter(hotspots_dict: dict[str, Hotspot], filters: types.ApiParams) -> dict[str, Hotspot]:
"""Filters a dict of hotspots with provided filters"""
filtered_findings = hotspots_dict.copy()
log.info("Post filtering findings with %s", str(filters))
log.debug("Post filtering findings with %s", str(filters))
if "createdAfter" in filters:
min_date = util.string_to_date(filters["createdAfter"])
if "createdBefore" in filters:
Expand All @@ -491,3 +494,12 @@ def post_search_filter(hotspots_dict: dict[str, Hotspot], filters: types.ApiPara
filtered_findings.pop(key, None)

return filtered_findings


def count(endpoint: pf.Platform, **kwargs) -> int:
"""Returns number of hotspots of a search"""
params = {} if not kwargs else kwargs.copy()
params["ps"] = 1
nbr_hotspots = len(search(endpoint=endpoint, filters=params))
log.debug("Hotspot counts with filters %s returned %d hotspots", str(kwargs), nbr_hotspots)
return nbr_hotspots
96 changes: 46 additions & 50 deletions sonar/issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
COMPONENT_FILTER_OLD = "componentKeys"
COMPONENT_FILTER = "components"

OLD_STATUS = "resolutions"
NEW_STATUS = "issueStatuses"

OLD_FP = "FALSE-POSITIVE"
NEW_FP = "FALSE_POSITIVE"

_SEARCH_CRITERIAS = (
COMPONENT_FILTER_OLD,
COMPONENT_FILTER,
Expand Down Expand Up @@ -77,18 +83,17 @@
"author",
"issues",
"languages",
"resolutions",
OLD_STATUS,
"resolved",
"rules",
"scopes",
# 10.2 new filter
"impactSeverities",
# 10.4 new filter
"issueStatuses",
NEW_STATUS,
)

_FILTERS_10_2_REMAPPING = {"severities": "impactSeverities"}
_FILTERS_10_4_REMAPPING = {"statuses": "issueStatuses"}

TYPES = ("BUG", "VULNERABILITY", "CODE_SMELL")
SEVERITIES = ("BLOCKER", "CRITICAL", "MAJOR", "MINOR", "INFO")
Expand All @@ -102,7 +107,7 @@
"impactSoftwareQualities": IMPACT_SOFTWARE_QUALITIES,
"impactSeverities": IMPACT_SEVERITIES,
"statuses": STATUSES,
"resolutions": RESOLUTIONS,
OLD_STATUS: RESOLUTIONS,
}

_TOO_MANY_ISSUES_MSG = "Too many issues, recursing..."
Expand Down Expand Up @@ -336,7 +341,7 @@ def is_wont_fix(self) -> bool:
:return: Whether the issue is won't fix
:rtype: bool
"""
return self.resolution == "WONT-FIX"
return self.resolution == "WONTFIX"

def is_accepted(self) -> bool:
"""
Expand All @@ -350,7 +355,7 @@ def is_false_positive(self) -> bool:
:return: Whether the issue is a false positive
:rtype: bool
"""
return self.resolution == "FALSE-POSITIVE"
return self.resolution in ("FALSE-POSITIVE", "FALSE_POSITIVE")

def strictly_identical_to(self, another_finding: Issue, ignore_component: bool = False) -> bool:
"""
Expand Down Expand Up @@ -447,7 +452,7 @@ def __apply_event(self, event: str, settings: ConfigSettings) -> bool:
else:
self.reopen()
# self.add_comment(f"Issue re-open {origin}", settings[SYNC_ADD_COMMENTS])
elif event_type == "FALSE-POSITIVE":
elif event_type in ("FALSE-POSITIVE", "FALSE_POSITIVE"):
self.mark_as_false_positive()
# self.add_comment(f"False positive {origin}", settings[SYNC_ADD_COMMENTS])
elif event_type == "WONT-FIX":
Expand Down Expand Up @@ -747,8 +752,6 @@ def search(endpoint: pf.Platform, params: ApiParams = None, raise_error: bool =
filters = pre_search_filters(endpoint=endpoint, params=params)
# if endpoint.version() >= (10, 2, 0):
# new_params = util.dict_remap_and_stringify(new_params, _FILTERS_10_2_REMAPPING)
if endpoint.version() >= (10, 4, 0):
filters = _change_filters_for_10_4(filters)

log.debug("Search filters = %s", str(filters))
if not filters:
Expand Down Expand Up @@ -822,11 +825,10 @@ def count(endpoint: pf.Platform, **kwargs) -> int:
params = {} if not kwargs else kwargs.copy()
params["ps"] = 1
try:
log.debug("Count params = %s", str(params))
nbr_issues = len(search(endpoint=endpoint, params=params))
except TooManyIssuesError as e:
nbr_issues = e.nbr_issues
log.debug("Issue search %s would return %d issues", str(kwargs), nbr_issues)
log.debug("Count issues with filters %s returned %d issues", str(kwargs), nbr_issues)
return nbr_issues


Expand All @@ -852,7 +854,7 @@ def count_by_rule(endpoint: pf.Platform, **kwargs) -> dict[str, int]:
if d["val"] not in rulecount:
rulecount[d["val"]] = 0
rulecount[d["val"]] += d["count"]
log.debug("Rule counts = %s", util.json_dump(rulecount))
# log.debug("Rule counts = %s", util.json_dump(rulecount))
return rulecount


Expand All @@ -868,42 +870,36 @@ def pre_search_filters(endpoint: pf.Platform, params: ApiParams) -> ApiParams:
"""Returns the filtered list of params that are allowed for api/issue/search"""
if not params:
return {}
filters = util.dict_subset(util.remove_nones(params.copy()), _SEARCH_CRITERIAS)
if endpoint.version() >= (10, 2, 0):
if COMPONENT_FILTER_OLD in filters:
filters[COMPONENT_FILTER] = filters.pop(COMPONENT_FILTER_OLD)
if "types" in filters:
__MAP = {"BUG": "RELIABILITY", "CODE_SMELL": "MAINTAINABILITY", "VULNERABILITY": "SECURITY", "SECURITY_HOTSPOT": "SECURITY"}
filters["impactSoftwareQualities"] = [__MAP[t] for t in filters.pop("types")]
if len(filters["impactSoftwareQualities"]) == 0:
filters.pop("impactSoftwareQualities")
if "severities" in filters:
__MAP = {"BLOCKER": "HIGH", "CRITICAL": "HIGH", "MAJOR": "MEDIUM", "MINOR": "LOW", "INFO": "LOW"}
filters["impactSeverities"] = [__MAP[t] for t in filters.pop("severities")]
if len(filters["impactSeverities"]) == 0:
filters.pop("impactSeverities")
for k, v in FILTERS_MAP.items():
if k in filters:
filters[k] = util.allowed_values_string(filters[k], v)
if filters.get("languages", None) is not None:
filters["languages"] = util.list_to_csv(filters["languages"])

log.debug("Sanitizing issue search filters %s", str(params))
version = endpoint.version()
filters = util.dict_remap(original_dict=params.copy(), remapping={"project": COMPONENT_FILTER})
filters = util.dict_subset(util.remove_nones(filters), _SEARCH_CRITERIAS)
if version < (10, 2, 0):
# Starting from 10.2 - "componentKeys" was renamed "components"
filters = util.dict_remap(original_dict=filters, remapping={COMPONENT_FILTER: COMPONENT_FILTER_OLD})
else:
# Starting from 10.2 - Issue types were replaced by software qualities, and severities replaced by impacts
__MAP = {"BUG": "RELIABILITY", "CODE_SMELL": "MAINTAINABILITY", "VULNERABILITY": "SECURITY", "SECURITY_HOTSPOT": "SECURITY"}
filters["impactSoftwareQualities"] = util.list_re_value(filters.pop("types", None), __MAP)
if len(filters["impactSoftwareQualities"]) == 0:
filters.pop("impactSoftwareQualities")
__MAP = {"BLOCKER": "HIGH", "CRITICAL": "HIGH", "MAJOR": "MEDIUM", "MINOR": "LOW", "INFO": "LOW"}
filters["impactSeverities"] = util.list_re_value(filters.pop("severities", None), __MAP)
if len(filters["impactSeverities"]) == 0:
filters.pop("impactSeverities")

if version < (10, 4, 0):
log.debug("Sanitizing issue search filters - fixing resolutions")
filters = util.dict_remap(original_dict=filters, remapping={NEW_STATUS: OLD_STATUS})
if OLD_STATUS in filters:
filters[OLD_STATUS] = util.list_re_value(filters[OLD_STATUS], mapping={NEW_FP: OLD_FP})
else:
# Starting from 10.4 - "resolutions" was renamed "issuesStatuses", "FALSE-POSITIVE" was renamed "FALSE_POSITIVE"
filters = util.dict_remap(original_dict=filters, remapping={OLD_STATUS: NEW_STATUS})
if NEW_STATUS in filters:
filters[NEW_STATUS] = util.list_re_value(filters[NEW_STATUS], mapping={OLD_FP: NEW_FP})

filters = {k: util.allowed_values_string(v, FILTERS_MAP[k]) if k in FILTERS_MAP else v for k, v in filters.items()}
filters = {k: util.list_to_csv(v) for k, v in filters.items() if v}
log.debug("Sanitized issue search filters %s", str(filters))
return filters


def _change_filters_for_10_4(filters: ApiParams) -> ApiParams:
"""Adjust filters for new 10.4 issues/search API parameters"""
if not filters:
return None
new_filters = util.dict_remap(filters.copy(), _FILTERS_10_4_REMAPPING)
statuses = []
for f in "resolutions", "issueStatuses":
if f in new_filters:
statuses += util.csv_to_list(new_filters[f])
new_filters.pop("resolutions", None)
if len(statuses) > 0:
if "FALSE-POSITIVE" in statuses:
statuses.remove("FALSE-POSITIVE")
statuses.append("FALSE_POSITIVE")
new_filters["issueStatuses"] = util.list_to_csv(statuses)
return new_filters
23 changes: 14 additions & 9 deletions sonar/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,8 @@ def export(self, export_settings: types.ConfigSettings, settings_list: dict[str,
:return: All project configuration settings
:rtype: dict
"""
from sonar import issues
from sonar.issues import count as issue_count
from sonar.hotspots import count as hotspot_count

log.info("Exporting %s", str(self))
try:
Expand Down Expand Up @@ -997,14 +998,18 @@ def export(self, export_settings: types.ConfigSettings, settings_list: dict[str,
"taskHistory": [t._json for t in self.task_history()],
}
tpissues = self.count_third_party_issues()
issue_data = {"thirdParty": tpissues if len(tpissues) > 0 else 0}
if self.endpoint.version() >= (10, 0, 0):
issue_data["falsePositives"] = issues.count(self.endpoint, components=self.key, issueStatuses="FALSE_POSITIVE")
issue_data["accepted"] = issues.count(self.endpoint, components=self.key, issueStatuses="ACCEPTED")
else:
issue_data["falsePositives"] = issues.count(self.endpoint, componentKeys=self.key, resolutions="FALSE-POSITIVE")
issue_data["wontFix"] = issues.count(self.endpoint, componentKeys=self.key, resolutions="WONTFIX")
json_data["issues"] = issue_data
params = self.search_params()
json_data["issues"] = {
"thirdParty": tpissues if len(tpissues) > 0 else 0,
"falsePositives": issue_count(self.endpoint, issueStatuses=["FALSE_POSITIVE"], **params),
}
status = "accepted" if self.endpoint.version() >= (10, 2, 0) else "wontFix"
json_data["issues"][status] = issue_count(self.endpoint, issueStatuses=[status.upper()], **params)
json_data["hotspots"] = {
"acknowledged": hotspot_count(self.endpoint, resolution=["ACKNOWLEDGED"], **params),
"safe": hotspot_count(self.endpoint, resolution=["SAFE"], **params),
"fixed": hotspot_count(self.endpoint, resolution=["FIXED"], **params),
}
log.debug("%s has these notable issues %s", str(self), str(json_data["issues"]))

settings_dict = settings.get_bulk(endpoint=self.endpoint, component=self, settings_list=settings_list, include_not_set=False)
Expand Down
27 changes: 19 additions & 8 deletions sonar/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,21 @@ def csv_to_list(string: str, separator: str = ",") -> list[str]:
return [s.strip() for s in string.split(separator)]


def list_to_csv(array: Union[None, str, list[str]], separator: str = ",", check_for_separator: bool = False) -> Optional[str]:
def list_to_csv(array: Union[None, str, int, float, list[str]], separator: str = ",", check_for_separator: bool = False) -> Optional[str]:
"""Converts a list of strings to CSV"""
if isinstance(array, str):
return csv_normalize(array, separator) if " " in array else array
if array is None:
return None
if check_for_separator:
# Don't convert to string if one array item contains the string separator
s = separator.strip()
for item in array:
if s in item:
return array
return separator.join([v.strip() for v in array])
if isinstance(array, (list, set, tuple)):
if check_for_separator:
# Don't convert to string if one array item contains the string separator
s = separator.strip()
for item in array:
if s in item:
return array
return separator.join([v.strip() for v in array])
return str(array)


def csv_normalize(string: str, separator: str = ",") -> str:
Expand Down Expand Up @@ -587,6 +589,15 @@ def dict_remap(original_dict: dict[str, str], remapping: dict[str, str]) -> dict
return remapped_filters


def list_re_value(a_list: list[str], mapping: dict[str, str]) -> list[str]:
"""Adjust findings search filters based on Sonar version"""
if not a_list or len(a_list) == 0:
return []
for old, new in mapping.items():
a_list = [new if v == old else v for v in a_list]
return a_list


def dict_stringify(original_dict: dict[str, str]) -> dict[str, str]:
"""Covert dict list values into CSV string"""
if not original_dict:
Expand Down
Loading