|
| 1 | +"""The module used to genetate incidents data based on intrusion sets""" |
| 2 | +import logging |
| 3 | +import math |
| 4 | +import random |
| 5 | +from math import ceil |
| 6 | +from typing import List |
| 7 | + |
| 8 | +import numpy |
| 9 | +from scipy.stats import betabinom |
| 10 | +from scipy.stats.sampling import DiscreteAliasUrn |
| 11 | + |
| 12 | +from attribution_tools.parsers import AttackPattern, IntrusionSet, Malware, Tool |
| 13 | + |
| 14 | +logger = logging.getLogger() |
| 15 | +logger.setLevel(logging.ERROR) |
| 16 | + |
| 17 | + |
| 18 | +def generate_incident_size(lbound: int, ubound: int) -> int: |
| 19 | + """Generate the size of incident based on""" |
| 20 | + alpha, beta = 1.5, 10.0 |
| 21 | + |
| 22 | + region_size = ceil(ubound - lbound) |
| 23 | + assert region_size > 0, f"Wrong bound arguments: {lbound}, {ubound}" |
| 24 | + |
| 25 | + percent_point_func = numpy.arange( |
| 26 | + betabinom.ppf(0.0, region_size, alpha, beta), betabinom.ppf(1.0, region_size, alpha, beta) |
| 27 | + ) |
| 28 | + random_variable = betabinom(region_size, alpha, beta) |
| 29 | + |
| 30 | + pmf_values = random_variable.pmf(percent_point_func).tolist() |
| 31 | + generator = DiscreteAliasUrn(pmf_values, random_state=numpy.random.default_rng()) |
| 32 | + |
| 33 | + return (generator.rvs(size=1) + lbound)[0] |
| 34 | + |
| 35 | + |
| 36 | +class IncidentGenerator: |
| 37 | + """Class in used to create incident based on intrusion set.""" |
| 38 | + |
| 39 | + # expected size of an incident (lower and upper bounds) |
| 40 | + N_SIZE_MIN, N_SIZE_MAX = 10, 50 |
| 41 | + # fraction taken by attack patterns |
| 42 | + FRAC_ATTACK_PATTERN = 0.5 |
| 43 | + # fraction taken by tools |
| 44 | + FRAC_TOOLS = 0.2 |
| 45 | + # fraction taken by malware |
| 46 | + FRAC_MALWARE = 0.2 |
| 47 | + # fraction taken by other elements |
| 48 | + FRAC_OTHER = 0.1 |
| 49 | + |
| 50 | + def generate(self, source: IntrusionSet) -> list: |
| 51 | + """Generation of the incident content.""" |
| 52 | + content = [] |
| 53 | + n_size_max = sum( |
| 54 | + [ |
| 55 | + len(source.attack_patterns), |
| 56 | + len(source.malwares), |
| 57 | + len(source.tools), |
| 58 | + len(source.indicators), |
| 59 | + len(source.identities), |
| 60 | + len(source.locations), |
| 61 | + len(source.vulnerabilities), |
| 62 | + ] |
| 63 | + ) |
| 64 | + if n_size_max < self.N_SIZE_MIN: |
| 65 | + n_size_max = self.N_SIZE_MIN |
| 66 | + |
| 67 | + n_size = generate_incident_size(self.N_SIZE_MIN, self.N_SIZE_MAX) |
| 68 | + n_size = min(n_size, n_size_max) |
| 69 | + |
| 70 | + content.extend(self.sample_attack_patterns(source.attack_patterns, n_size)) |
| 71 | + content.extend(self.sample_tools(source.tools, n_size)) |
| 72 | + content.extend(self.sample_malwares(source.malwares, n_size)) |
| 73 | + other_entities = source.indicators + source.vulnerabilities + source.identities + source.locations |
| 74 | + content.extend(self.sample_others(other_entities, n_size)) |
| 75 | + |
| 76 | + return content |
| 77 | + |
| 78 | + def sample_attack_patterns(self, source: List[AttackPattern], max_incident_size) -> List[str]: |
| 79 | + """Creates the sample list of attack patterns.""" |
| 80 | + result = [] |
| 81 | + if source: |
| 82 | + n_max_attack_patterns = math.ceil(max_incident_size * self.FRAC_ATTACK_PATTERN) |
| 83 | + selection = set(random.choices(source, k=n_max_attack_patterns)) |
| 84 | + result.extend([item.semantic_id for item in selection]) |
| 85 | + return result |
| 86 | + |
| 87 | + def sample_tools(self, source: List[Tool], max_incident_size) -> List[str]: |
| 88 | + """Creates the sample list of tools.""" |
| 89 | + result = [] |
| 90 | + if source: |
| 91 | + n_max_tools = math.ceil(max_incident_size * self.FRAC_TOOLS) |
| 92 | + selection = set(random.choices(source, k=n_max_tools)) |
| 93 | + result.extend([item.semantic_id for item in selection]) |
| 94 | + return result |
| 95 | + |
| 96 | + def sample_malwares(self, source: List[Malware], max_incident_size) -> List[str]: |
| 97 | + """Creates the sample list of malwares.""" |
| 98 | + result = [] |
| 99 | + if source: |
| 100 | + n_max_malwares = math.ceil(max_incident_size * self.FRAC_MALWARE) |
| 101 | + selection = set(random.choices(source, k=n_max_malwares)) |
| 102 | + result.extend([item.semantic_id for item in selection]) |
| 103 | + return result |
| 104 | + |
| 105 | + def sample_others(self, source: list, max_incident_size) -> List[str]: |
| 106 | + """Creates the sample list of other STIX2 entities.""" |
| 107 | + result = [] |
| 108 | + if source: |
| 109 | + n_max_others = math.ceil(max_incident_size * self.FRAC_OTHER) |
| 110 | + selection = random.sample(source, min(len(source), n_max_others)) # do note the difference of this method |
| 111 | + result.extend([item.semantic_id for item in selection]) |
| 112 | + return result |
0 commit comments