Skip to content

Commit ef1d59d

Browse files
committed
autotailor: Apply review feedback for data stream validation
- Move XML namespace dict to module-level DS_NAMESPACES constant - Split _parse_datastream into _extract_profiles/values/rules/groups - Add selector validation for -V/--var-select option - Fix terminology: "datastream" -> "data stream" in all user-facing text - Add --no-validate option to the man page
1 parent 1170379 commit ef1d59d

4 files changed

Lines changed: 134 additions & 52 deletions

File tree

tests/utils/autotailor_integration_test.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,17 @@ assert_exists 1 '/Benchmark/TestResult/rule-result[@idref="xccdf_com.example.www
112112
assert_exists 1 '/Benchmark/TestResult/rule-result[@idref="xccdf_com.example.www_rule_R3"]/result[text()="notselected"]'
113113
assert_exists 1 '/Benchmark/TestResult/rule-result[@idref="xccdf_com.example.www_rule_R4"]/result[text()="notselected"]'
114114

115+
# invalid selector for V1 should fail with a descriptive error
116+
! python3 $autotailor --id-namespace "com.example.www" --var-select V1=invalid $ds $original_profile 2>$stdout
117+
grep "Selector 'invalid' does not exist" $stdout
118+
119+
# invalid selector for V2 should fail with available selectors listed
120+
! python3 $autotailor --id-namespace "com.example.www" --var-select V2=invalid $ds $original_profile 2>$stdout
121+
grep "Available selectors" $stdout
122+
123+
# --no-validate bypasses selector validation
124+
python3 $autotailor --id-namespace "com.example.www" --no-validate --var-select V1=invalid $ds $original_profile > $tailoring
125+
115126
# use JSON tailoring (P1)
116127
python3 $autotailor $ds --id-namespace "com.example.www" --json-tailoring $json_tailoring > $tailoring
117128
$OSCAP xccdf eval --profile JSON_P1 --progress --tailoring-file $tailoring --results $result $ds

tests/utils/test_autotailor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,51 @@ def test_validator_suggestions():
229229
error_msg = str(e.value)
230230
assert "Did you mean one of these?" in error_msg
231231
assert "xccdf_com.example.www_rule_R1" in error_msg
232+
233+
234+
def test_validate_selector():
235+
"""Test that validate_selector rejects selectors not present in the data stream."""
236+
ds_path = pathlib.Path(__file__).parent.joinpath("data_stream.xml")
237+
validator = autotailor.DataStreamValidator(str(ds_path))
238+
239+
# V1 has selector "thirty"; V2 has "some" and "other"
240+
validator.validate_selector("xccdf_com.example.www_value_V1", "thirty")
241+
validator.validate_selector("xccdf_com.example.www_value_V2", "some")
242+
validator.validate_selector("xccdf_com.example.www_value_V2", "other")
243+
244+
# Invalid selector for V1
245+
with pytest.raises(ValueError) as e:
246+
validator.validate_selector("xccdf_com.example.www_value_V1", "invalid")
247+
error_msg = str(e.value)
248+
assert "Selector 'invalid' does not exist for Value 'xccdf_com.example.www_value_V1'" in error_msg
249+
assert "thirty" in error_msg
250+
251+
# Invalid selector for V2 with a close-enough typo triggers a suggestion
252+
with pytest.raises(ValueError) as e:
253+
validator.validate_selector("xccdf_com.example.www_value_V2", "ther")
254+
error_msg = str(e.value)
255+
assert "Selector 'ther' does not exist for Value 'xccdf_com.example.www_value_V2'" in error_msg
256+
assert "other" in error_msg
257+
258+
259+
def test_profile_selector_validation():
260+
"""Test that Profile validates selectors via -V/--var-select through refine_value."""
261+
ds_path = pathlib.Path(__file__).parent.joinpath("data_stream.xml")
262+
validator = autotailor.DataStreamValidator(str(ds_path))
263+
264+
p = autotailor.Profile(validator=validator)
265+
p.reverse_dns = "com.example.www"
266+
267+
# Valid selector passes
268+
p.change_selectors(["V1=thirty"])
269+
p.change_selectors(["V2=some"])
270+
271+
# Invalid selector raises
272+
with pytest.raises(ValueError) as e:
273+
p.change_selectors(["V1=invalid"])
274+
assert "Selector 'invalid' does not exist for Value 'xccdf_com.example.www_value_V1'" in str(e.value)
275+
276+
# Invalid value ID still raises before reaching selector check
277+
with pytest.raises(ValueError) as e:
278+
p.change_selectors(["NONEXISTENT=thirty"])
279+
assert "Value ID 'xccdf_com.example.www_value_NONEXISTENT' does not exist" in str(e.value)

utils/autotailor

Lines changed: 68 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ ROLES = ["full", "unscored", "unchecked"]
3838
SEVERITIES = ["unknown", "info", "low", "medium", "high"]
3939
ATTRIBUTES = ["role", "severity"]
4040

41+
DS_NAMESPACES = {
42+
'ds': DS_NS,
43+
'xccdf': NS
44+
}
45+
4146

4247
def quote(string):
4348
return "\"" + string + "\""
@@ -56,60 +61,62 @@ def is_valid_xccdf_id(string):
5661

5762

5863
class DataStreamValidator:
59-
"""Validates IDs against the SCAP datastream."""
64+
"""Validates IDs against the SCAP data stream."""
6065

6166
def __init__(self, datastream_path):
6267
self.datastream_path = datastream_path
6368
self.profile_ids = set()
6469
self.value_ids = set()
70+
self.value_selectors = {}
6571
self.rule_ids = set()
6672
self.group_ids = set()
6773
self._parse_datastream()
6874

75+
def _extract_profiles(self, benchmark):
76+
for profile in benchmark.findall('.//xccdf:Profile', DS_NAMESPACES):
77+
profile_id = profile.get('id')
78+
if profile_id:
79+
self.profile_ids.add(profile_id)
80+
81+
def _extract_values(self, benchmark):
82+
for value in benchmark.findall('.//xccdf:Value', DS_NAMESPACES):
83+
value_id = value.get('id')
84+
if not value_id:
85+
continue
86+
self.value_ids.add(value_id)
87+
selectors = set()
88+
for val_elem in value.findall('xccdf:value', DS_NAMESPACES):
89+
selector = val_elem.get('selector')
90+
if selector:
91+
selectors.add(selector)
92+
self.value_selectors[value_id] = selectors
93+
94+
def _extract_rules(self, benchmark):
95+
for rule in benchmark.findall('.//xccdf:Rule', DS_NAMESPACES):
96+
rule_id = rule.get('id')
97+
if rule_id:
98+
self.rule_ids.add(rule_id)
99+
100+
def _extract_groups(self, benchmark):
101+
for group in benchmark.findall('.//xccdf:Group', DS_NAMESPACES):
102+
group_id = group.get('id')
103+
if group_id:
104+
self.group_ids.add(group_id)
105+
69106
def _parse_datastream(self):
70-
"""Parse the datastream to extract all valid IDs."""
71107
try:
72108
tree = ET.parse(self.datastream_path)
73109
root = tree.getroot()
74-
75-
# Register namespaces
76-
namespaces = {
77-
'ds': DS_NS,
78-
'xccdf': NS
79-
}
80-
81-
# Find all Benchmark elements (may be in data-stream-collection or standalone)
82-
benchmarks = root.findall('.//xccdf:Benchmark', namespaces)
83-
110+
benchmarks = root.findall('.//xccdf:Benchmark', DS_NAMESPACES)
84111
for benchmark in benchmarks:
85-
# Extract Profile IDs
86-
for profile in benchmark.findall('.//xccdf:Profile', namespaces):
87-
profile_id = profile.get('id')
88-
if profile_id:
89-
self.profile_ids.add(profile_id)
90-
91-
# Extract Value IDs
92-
for value in benchmark.findall('.//xccdf:Value', namespaces):
93-
value_id = value.get('id')
94-
if value_id:
95-
self.value_ids.add(value_id)
96-
97-
# Extract Rule IDs
98-
for rule in benchmark.findall('.//xccdf:Rule', namespaces):
99-
rule_id = rule.get('id')
100-
if rule_id:
101-
self.rule_ids.add(rule_id)
102-
103-
# Extract Group IDs
104-
for group in benchmark.findall('.//xccdf:Group', namespaces):
105-
group_id = group.get('id')
106-
if group_id:
107-
self.group_ids.add(group_id)
108-
112+
self._extract_profiles(benchmark)
113+
self._extract_values(benchmark)
114+
self._extract_rules(benchmark)
115+
self._extract_groups(benchmark)
109116
except ET.ParseError as e:
110-
raise ValueError(f"Failed to parse datastream '{self.datastream_path}': {e}")
117+
raise ValueError(f"Failed to parse data stream '{self.datastream_path}': {e}")
111118
except FileNotFoundError:
112-
raise ValueError(f"Datastream file not found: '{self.datastream_path}'")
119+
raise ValueError(f"Data stream file not found: '{self.datastream_path}'")
113120

114121
def _suggest_similar(self, invalid_id, valid_ids, n=3):
115122
"""Suggest similar valid IDs using fuzzy matching."""
@@ -120,8 +127,7 @@ class DataStreamValidator:
120127
return matches
121128

122129
def _create_validation_error(self, id_type, invalid_id, valid_ids):
123-
"""Create a detailed error message with suggestions."""
124-
msg = f"{id_type} ID '{invalid_id}' does not exist in the datastream."
130+
msg = f"{id_type} ID '{invalid_id}' does not exist in the data stream."
125131

126132
suggestions = self._suggest_similar(invalid_id, valid_ids)
127133
if suggestions:
@@ -130,29 +136,40 @@ class DataStreamValidator:
130136
msg += f"\n - {suggestion}"
131137

132138
if not valid_ids:
133-
msg += f"\n\nNo {id_type.lower()}s found in the datastream."
139+
msg += f"\n\nNo {id_type.lower()}s found in the data stream."
134140
else:
135-
msg += f"\n\nAvailable {id_type.lower()}s can be listed by examining the datastream file."
141+
msg += f"\n\nAvailable {id_type.lower()}s can be listed by examining the data stream file."
136142

137143
return msg
138144

139145
def validate_profile(self, profile_id):
140-
"""Validate a profile ID exists in the datastream."""
141146
if profile_id not in self.profile_ids:
142147
raise ValueError(self._create_validation_error("Profile", profile_id, self.profile_ids))
143148

144149
def validate_value(self, value_id):
145-
"""Validate a value ID exists in the datastream."""
146150
if value_id not in self.value_ids:
147151
raise ValueError(self._create_validation_error("Value", value_id, self.value_ids))
148152

153+
def validate_selector(self, value_id, selector):
154+
selectors = self.value_selectors.get(value_id, set())
155+
if selector not in selectors:
156+
msg = f"Selector '{selector}' does not exist for Value '{value_id}' in the data stream."
157+
if selectors:
158+
suggestions = self._suggest_similar(selector, selectors)
159+
if suggestions:
160+
msg += "\n\nDid you mean one of these?"
161+
for suggestion in suggestions:
162+
msg += f"\n - {suggestion}"
163+
msg += f"\n\nAvailable selectors: {', '.join(sorted(selectors))}"
164+
else:
165+
msg += "\n\nNo selectors found for this value in the data stream."
166+
raise ValueError(msg)
167+
149168
def validate_rule(self, rule_id):
150-
"""Validate a rule ID exists in the datastream."""
151169
if rule_id not in self.rule_ids:
152170
raise ValueError(self._create_validation_error("Rule", rule_id, self.rule_ids))
153171

154172
def validate_group(self, group_id):
155-
"""Validate a group ID exists in the datastream."""
156173
if group_id not in self.group_ids:
157174
raise ValueError(self._create_validation_error("Group", group_id, self.group_ids))
158175

@@ -251,6 +268,8 @@ class Profile:
251268
Profile._validate_value_refinement_params(value_id, attribute, value)
252269
if self.validator:
253270
self.validator.validate_value(value_id)
271+
if attribute == 'selector':
272+
self.validator.validate_selector(value_id, value)
254273
self._prevent_duplicate_value_refinement(attribute, value_id, value)
255274
self._value_refinements[value_id][attribute] = value
256275

@@ -610,8 +629,8 @@ def get_parser():
610629
"Absolute paths are converted to basename, relative paths are preserved.")
611630
parser.add_argument(
612631
"--no-validate", action="store_true",
613-
help="Skip validation of IDs against the datastream. This significantly speeds up "
614-
"execution on large datastreams but may produce invalid tailoring files if incorrect "
632+
help="Skip validation of IDs against the data stream. This significantly speeds up "
633+
"execution on large data streams but may produce invalid tailoring files if incorrect "
615634
"IDs are provided. Use with caution.")
616635
return parser
617636

@@ -626,13 +645,12 @@ if __name__ == "__main__":
626645
parser.error("one of the following arguments has to be provided: "
627646
"BASE_PROFILE_ID or --json-tailoring JSON_TAILORING_FILENAME")
628647

629-
# Create validator to check IDs against the datastream (unless --no-validate is specified)
630648
validator = None
631649
if not args.no_validate:
632650
try:
633651
validator = DataStreamValidator(args.datastream)
634652
except (ValueError, FileNotFoundError) as e:
635-
print(f"Error loading datastream: {e}", file=sys.stderr)
653+
print(f"Error loading data stream: {e}", file=sys.stderr)
636654
sys.exit(1)
637655

638656
t = Tailoring(validator=validator)

utils/autotailor.8

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ A tailoring file adds a new profile, which is supposed to extend a profile that
88
Tailoring can add, remove or refine rules, and it also can redefine contents of XCCDF variables.
99

1010
The tool requires data stream location and ID of the base profile as inputs.
11-
Note however, that the referenced data stream is not opened, and the validity of tailoring is not checked against it.
12-
The tool doesn't prevent you from extending non-existent profiles, selecting non-existent rules, and so on.
11+
By default, the tool parses the referenced data stream and validates provided IDs (profiles, rules, values, groups, and selectors) against it.
12+
Validation can be skipped using the \fB--no-validate\fR option, which significantly speeds up execution on large data streams at the cost of no longer catching invalid IDs.
1313

1414
.SH SYNOPSIS
1515
autotailor [OPTION...] DATASTREAM_FILE [BASE_PROFILE_ID]
@@ -81,6 +81,11 @@ Use local path for the benchmark href instead of absolute file:// URI. This opti
8181
When this option is specified, absolute paths are converted to basename only, while relative paths are preserved as provided.
8282
By default, the tool uses absolute file:// URIs for backward compatibility.
8383
.RE
84+
.TP
85+
\fB--no-validate\fR
86+
.RS
87+
Skip validation of IDs against the data stream. This significantly speeds up execution on large data streams but may produce invalid tailoring files if incorrect IDs are provided. Use with caution.
88+
.RE
8489

8590
.SH USAGE
8691
.SS Modify a variable value

0 commit comments

Comments
 (0)