Skip to content

Commit

Permalink
feat: more granular pro engine config metrics (#202)
Browse files Browse the repository at this point in the history
Creates a new metrics type which is closely aligned with osemgrep's
Engine_type.t which allows for more precise tracking of the engine
feature matrix. This is needed compared to a simple enumeration since we
have more orthogonal features than previously. For instance, we can now
perform analysis on at least two axes (within pro):
    (1) secrets valdiation [enabled <-> disabled];
    (2) dataflow [none <-> ... <-> interfile]
Creating a new enum entry for each possible permutation is not
futureproof (and is ugly), so we need a more complex type here.
  • Loading branch information
kopecs authored Dec 20, 2023
1 parent eb5a93e commit 9f1c503
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 0 deletions.
31 changes: 31 additions & 0 deletions semgrep_metrics.atd
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,35 @@ type pro_features = {
?diffDepth <ocaml mutable>: int option;
}

type analysis_type <ocaml attr="deriving show"> = [
| Intraprocedural
| Interprocedural
| Interfile
]

type code_config <ocaml attr="deriving show"> = unit

type secrets_origin <ocaml attr="deriving show"> = [ Any | Semgrep ]
type secrets_config
<ocaml attr="deriving show"> = {
permitted_origins: secrets_origin;
}

type supply_chain_config <ocaml attr="deriving show"> = unit

(* Since v1.54.0 *)
type engine_config
<ocaml attr="deriving show"> = {
analysis_type: analysis_type;
pro_langs: bool;
(* `Some c` where `c` is the config if the product was run.
* `None` if it was not run.
*)
?code_config: code_config option;
?secrets_config: secrets_config option;
?supply_chain_config: supply_chain_config option;
}

type misc = {
(* coupling: features is commented a lot in semgrep/PRIVACY.md *)
features <ocaml mutable>: string list;
Expand All @@ -166,6 +195,8 @@ type misc = {
?ruleHashesWithFindings <ocaml mutable>: (string * int) list <json repr="object"> option;
(* TODO: should be OSS | Pro, see semgrep_output_v1.atd engine_kind type *)
~engineRequested <python default="'OSS'"> <ocaml mutable>: string;
(* Since Semgrep 1.54.0 *)
?engineConfig <ocaml mutable>: engine_config option;
(* Since Semgrep 1.49.0 *)
?interfileLanguagesUsed: string list option;
}
274 changes: 274 additions & 0 deletions semgrep_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,30 @@ def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class SupplyChainConfig:
"""Original type: supply_chain_config = { ... }"""

@classmethod
def from_json(cls, x: Any) -> 'SupplyChainConfig':
if isinstance(x, dict):
return cls(
)
else:
_atd_bad_json('SupplyChainConfig', x)

def to_json(self) -> Any:
res: Dict[str, Any] = {}
return res

@classmethod
def from_json_string(cls, x: str) -> 'SupplyChainConfig':
return cls.from_json(json.loads(x))

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class Sha256:
"""Original type: sha256"""
Expand All @@ -305,6 +329,100 @@ def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class Any_:
"""Original type: secrets_origin = [ ... | Any | ... ]"""

@property
def kind(self) -> str:
"""Name of the class representing this variant."""
return 'Any_'

@staticmethod
def to_json() -> Any:
return 'Any'

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class Semgrep:
"""Original type: secrets_origin = [ ... | Semgrep | ... ]"""

@property
def kind(self) -> str:
"""Name of the class representing this variant."""
return 'Semgrep'

@staticmethod
def to_json() -> Any:
return 'Semgrep'

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class SecretsOrigin:
"""Original type: secrets_origin = [ ... ]"""

value: Union[Any_, Semgrep]

@property
def kind(self) -> str:
"""Name of the class representing this variant."""
return self.value.kind

@classmethod
def from_json(cls, x: Any) -> 'SecretsOrigin':
if isinstance(x, str):
if x == 'Any':
return cls(Any_())
if x == 'Semgrep':
return cls(Semgrep())
_atd_bad_json('SecretsOrigin', x)
_atd_bad_json('SecretsOrigin', x)

def to_json(self) -> Any:
return self.value.to_json()

@classmethod
def from_json_string(cls, x: str) -> 'SecretsOrigin':
return cls.from_json(json.loads(x))

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class SecretsConfig:
"""Original type: secrets_config = { ... }"""

permitted_origins: SecretsOrigin

@classmethod
def from_json(cls, x: Any) -> 'SecretsConfig':
if isinstance(x, dict):
return cls(
permitted_origins=SecretsOrigin.from_json(x['permitted_origins']) if 'permitted_origins' in x else _atd_missing_json_field('SecretsConfig', 'permitted_origins'),
)
else:
_atd_bad_json('SecretsConfig', x)

def to_json(self) -> Any:
res: Dict[str, Any] = {}
res['permitted_origins'] = (lambda x: x.to_json())(self.permitted_origins)
return res

@classmethod
def from_json_string(cls, x: str) -> 'SecretsConfig':
return cls.from_json(json.loads(x))

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class RuleStats:
"""Original type: rule_stats = { ... }"""
Expand Down Expand Up @@ -502,6 +620,158 @@ def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class CodeConfig:
"""Original type: code_config = { ... }"""

@classmethod
def from_json(cls, x: Any) -> 'CodeConfig':
if isinstance(x, dict):
return cls(
)
else:
_atd_bad_json('CodeConfig', x)

def to_json(self) -> Any:
res: Dict[str, Any] = {}
return res

@classmethod
def from_json_string(cls, x: str) -> 'CodeConfig':
return cls.from_json(json.loads(x))

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class Intraprocedural:
"""Original type: analysis_type = [ ... | Intraprocedural | ... ]"""

@property
def kind(self) -> str:
"""Name of the class representing this variant."""
return 'Intraprocedural'

@staticmethod
def to_json() -> Any:
return 'Intraprocedural'

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class Interprocedural:
"""Original type: analysis_type = [ ... | Interprocedural | ... ]"""

@property
def kind(self) -> str:
"""Name of the class representing this variant."""
return 'Interprocedural'

@staticmethod
def to_json() -> Any:
return 'Interprocedural'

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class Interfile:
"""Original type: analysis_type = [ ... | Interfile | ... ]"""

@property
def kind(self) -> str:
"""Name of the class representing this variant."""
return 'Interfile'

@staticmethod
def to_json() -> Any:
return 'Interfile'

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class AnalysisType:
"""Original type: analysis_type = [ ... ]"""

value: Union[Intraprocedural, Interprocedural, Interfile]

@property
def kind(self) -> str:
"""Name of the class representing this variant."""
return self.value.kind

@classmethod
def from_json(cls, x: Any) -> 'AnalysisType':
if isinstance(x, str):
if x == 'Intraprocedural':
return cls(Intraprocedural())
if x == 'Interprocedural':
return cls(Interprocedural())
if x == 'Interfile':
return cls(Interfile())
_atd_bad_json('AnalysisType', x)
_atd_bad_json('AnalysisType', x)

def to_json(self) -> Any:
return self.value.to_json()

@classmethod
def from_json_string(cls, x: str) -> 'AnalysisType':
return cls.from_json(json.loads(x))

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class EngineConfig:
"""Original type: engine_config = { ... }"""

analysis_type: AnalysisType
pro_langs: bool
code_config: Optional[CodeConfig] = None
secrets_config: Optional[SecretsConfig] = None
supply_chain_config: Optional[SupplyChainConfig] = None

@classmethod
def from_json(cls, x: Any) -> 'EngineConfig':
if isinstance(x, dict):
return cls(
analysis_type=AnalysisType.from_json(x['analysis_type']) if 'analysis_type' in x else _atd_missing_json_field('EngineConfig', 'analysis_type'),
pro_langs=_atd_read_bool(x['pro_langs']) if 'pro_langs' in x else _atd_missing_json_field('EngineConfig', 'pro_langs'),
code_config=CodeConfig.from_json(x['code_config']) if 'code_config' in x else None,
secrets_config=SecretsConfig.from_json(x['secrets_config']) if 'secrets_config' in x else None,
supply_chain_config=SupplyChainConfig.from_json(x['supply_chain_config']) if 'supply_chain_config' in x else None,
)
else:
_atd_bad_json('EngineConfig', x)

def to_json(self) -> Any:
res: Dict[str, Any] = {}
res['analysis_type'] = (lambda x: x.to_json())(self.analysis_type)
res['pro_langs'] = _atd_write_bool(self.pro_langs)
if self.code_config is not None:
res['code_config'] = (lambda x: x.to_json())(self.code_config)
if self.secrets_config is not None:
res['secrets_config'] = (lambda x: x.to_json())(self.secrets_config)
if self.supply_chain_config is not None:
res['supply_chain_config'] = (lambda x: x.to_json())(self.supply_chain_config)
return res

@classmethod
def from_json_string(cls, x: str) -> 'EngineConfig':
return cls.from_json(json.loads(x))

def to_json_string(self, **kw: Any) -> str:
return json.dumps(self.to_json(), **kw)


@dataclass
class Misc:
"""Original type: misc = { ... }"""
Expand All @@ -512,6 +782,7 @@ class Misc:
numIgnored: Optional[int] = None
ruleHashesWithFindings: Optional[List[Tuple[str, int]]] = None
engineRequested: str = field(default_factory=lambda: 'OSS')
engineConfig: Optional[EngineConfig] = None
interfileLanguagesUsed: Optional[List[str]] = None

@classmethod
Expand All @@ -524,6 +795,7 @@ def from_json(cls, x: Any) -> 'Misc':
numIgnored=_atd_read_int(x['numIgnored']) if 'numIgnored' in x else None,
ruleHashesWithFindings=_atd_read_assoc_object_into_list(_atd_read_int)(x['ruleHashesWithFindings']) if 'ruleHashesWithFindings' in x else None,
engineRequested=_atd_read_string(x['engineRequested']) if 'engineRequested' in x else 'OSS',
engineConfig=EngineConfig.from_json(x['engineConfig']) if 'engineConfig' in x else None,
interfileLanguagesUsed=_atd_read_list(_atd_read_string)(x['interfileLanguagesUsed']) if 'interfileLanguagesUsed' in x else None,
)
else:
Expand All @@ -541,6 +813,8 @@ def to_json(self) -> Any:
if self.ruleHashesWithFindings is not None:
res['ruleHashesWithFindings'] = _atd_write_assoc_list_to_object(_atd_write_int)(self.ruleHashesWithFindings)
res['engineRequested'] = _atd_write_string(self.engineRequested)
if self.engineConfig is not None:
res['engineConfig'] = (lambda x: x.to_json())(self.engineConfig)
if self.interfileLanguagesUsed is not None:
res['interfileLanguagesUsed'] = _atd_write_list(_atd_write_string)(self.interfileLanguagesUsed)
return res
Expand Down

0 comments on commit 9f1c503

Please sign in to comment.