Skip to content

Commit 6dc1ba3

Browse files
authored
feat(audit): defensive redaction pass on log + manifest writes (#148)
Closes hardening item #5 from "Secret Handling & Credential Surface Hardening" in docs/developer/SDK-ROADMAP.md. Adds a recursive walker that masks every string in the audit trail at serialization time, plus fixes a pre-existing bug where masking record.msg corrupted %s format strings whose placeholders matched the token: pattern. argus/audit/secrets.py: - New mask_secrets_in_obj(obj) walker. Recurses through dicts, lists, tuples; applies mask_secrets to every string value; leaves keys and non-string scalars untouched; returns a new structure (does NOT mutate the input). argus/audit/logger.py: - JsonLogFormatter.format: stop mutating record.msg. Mask the rendered record.getMessage() instead — catches secrets passed as printf-style args (logger.info("token: %s", real_token)) which the prior approach missed because record.msg held the format string, not the rendered output. Then walk the assembled JSON entry through mask_secrets_in_obj so extra fields a contributor might add to the formatter also get masked. - ColoredConsoleFormatter.format: same fix — mask the rendered message, not record.msg. Without this fix, "token: %s" matched the token=/token: pattern and was rewritten to "token: <REDACTED>", then record.getMessage() raised TypeError trying to substitute args into a format string with no %s placeholder. Bug had been silently masked because no test exercised the printf path. argus/audit/manifest.py: - AuditManifest.save: walk asdict(self) through mask_secrets_in_obj before json.dumps. Defense-in-depth: today's manifest schema doesn't include credential fields, but if a future field captures a docker_cmd, env dict, or credential-shaped argv it gets masked before hitting argus-audit.json. Design note (vs. roadmap text): The roadmap entry suggested reusing core/redact.redact_high_risk_patterns (the vendor-prefix-only set used by Finding.__post_init__). The existing audit/secrets.mask_secrets already covers that surface plus broader patterns appropriate for log lines (token=, password=, Bearer, URL creds, sk-keys). Extending audit/secrets keeps the redactor co-located with its callers — easier to reason about and no cross-module hop at hot-path serialization time. Test coverage (19 new): - argus/tests/audit/test_secrets.py::TestMaskSecretsInObj — 10 tests: root scalar, dict value, nested dict, list, tuple, scalar passthrough, no-mutation guard, deeply nested mix, dict-key preservation, unknown type passthrough. - argus/tests/audit/test_logger.py::TestJsonLogSecretLeakProtection — 4 tests: format-string secret, record.args secret (the regression fix), extra-field secret, non-secret strings preserved unchanged. - argus/tests/audit/test_manifest.py::TestManifestSecretLeakProtection — 5 tests: phase error, artifact path, nested dict at depth 4, input not mutated after save, clean-manifest false-positive guard. .ai/architecture.yaml: new audit/ entry in both SDK structure blocks documenting the redaction posture, the walker, and the rendered-message masking rationale. Full suite: 3126 passed (+19 new), 2 skipped. Co-authored-by: eFAILution <eFAILution@users.noreply.github.com>
1 parent 864c162 commit 6dc1ba3

8 files changed

Lines changed: 331 additions & 9 deletions

File tree

.ai/architecture.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ components:
4949
"linters/": "Linter modules implementing Scanner protocol (LINTER_REGISTRY auto-merges into SCANNER_REGISTRY)"
5050
"reporters/": "Output reporters (terminal, markdown, sarif, json, github, gitlab, junit). Discovered via the ``argus.reporters`` Python entry-point group (built-ins declared in pyproject.toml; third-party packages register additional formats without forking — see docs/contributing-reporters.md and ADR-023)."
5151
"preflight/": "CI preflight: provider detection, living issue reporting (GitHub/GitLab), network deps, scanner tool-readiness checks (tool_check.py)"
52+
"audit/": "Structured audit trail for every scan run. logger.py emits JSONL log records (one per line) into ``argus-results/.../argus.log``; manifest.py writes the per-run AuditManifest summary into ``argus-audit.json``. secrets.py provides mask_secrets (regex masking for token=, password=, Bearer, URL-creds, GitHub PATs, AWS access keys, sk-keys) and the mask_secrets_in_obj recursive walker. Both write paths run mask_secrets_in_obj at serialization time as defense-in-depth — if a future contributor accidentally captures a docker_cmd / env dict / credential-shaped argv into a manifest field or log entry, the redaction pass catches it before the file lands. Both logger formatters mask the rendered ``record.getMessage()`` rather than ``record.msg`` to avoid corrupting %s format strings whose placeholders match secret-shaped patterns."
5253

5354
- name: argus-linters
5455
location: "argus/linters/"

argus/audit/logger.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ def __init__(self, use_color: bool = True):
5555
self._use_color = use_color
5656

5757
def format(self, record: logging.LogRecord) -> str:
58-
record.msg = mask_secrets(str(record.msg))
59-
58+
# Mask the rendered output, NOT record.msg — see the note in
59+
# JsonLogFormatter.format() for why masking the format string
60+
# breaks ``%s`` substitution.
6061
ts = self.formatTime(record, "%H:%M:%S")
6162
name = record.name.replace("argus.", "")
62-
msg = record.getMessage()
63+
msg = mask_secrets(record.getMessage())
6364

6465
if not self._use_color:
6566
return f"{ts} {record.levelname:<8} {name} {msg}"
@@ -78,7 +79,13 @@ class JsonLogFormatter(logging.Formatter):
7879
"""
7980

8081
def format(self, record: logging.LogRecord) -> str:
81-
record.msg = mask_secrets(str(record.msg))
82+
# NOTE: do NOT mask ``record.msg`` directly. The msg is a
83+
# printf-style format string; if its non-whitespace section
84+
# matches a secret-masking pattern (e.g., ``"token: %s"`` matches
85+
# ``r"token[=:]\s*[^\s]+"``), masking turns the format string
86+
# into a literal with no placeholders, and the subsequent
87+
# ``record.getMessage()`` raises TypeError on ``%s`` substitution.
88+
# Mask the *rendered* message string below instead.
8289
entry: dict = {
8390
"timestamp": datetime.fromtimestamp(
8491
record.created, tz=timezone.utc
@@ -87,13 +94,25 @@ def format(self, record: logging.LogRecord) -> str:
8794
"module": record.name,
8895
"function": record.funcName,
8996
"line": record.lineno,
90-
"message": record.getMessage(),
97+
# ``getMessage()`` re-renders the format string with
98+
# ``record.args`` interpolated back in. If a caller passed
99+
# a secret as a printf-style arg
100+
# (``logger.info("token: %s", real_token)``), that secret
101+
# never touched ``record.msg`` — only ``record.args`` — so
102+
# masking ``record.msg`` above wouldn't catch it. We mask
103+
# the *rendered* result to cover both paths.
104+
"message": mask_secrets(record.getMessage()),
91105
}
92106
# Include extra fields if present (scanner name, phase, etc.)
93107
for key in ("scanner", "phase", "image", "duration_ms"):
94108
if hasattr(record, key):
95109
entry[key] = getattr(record, key)
96-
return json.dumps(entry, default=str)
110+
# Defense-in-depth: walk the assembled entry and re-mask any
111+
# string a contributor might add to the ``extra`` set above
112+
# without remembering to mask first. ``mask_secrets_in_obj``
113+
# is idempotent — already-masked values pass through unchanged.
114+
from argus.audit.secrets import mask_secrets_in_obj
115+
return json.dumps(mask_secrets_in_obj(entry), default=str)
97116

98117

99118
# ---------------------------------------------------------------------------

argus/audit/manifest.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,26 @@ class AuditManifest:
7575
artifacts: list[dict] = field(default_factory=list)
7676

7777
def save(self, output_dir: str | Path) -> Path:
78-
"""Write the manifest to ``argus-audit.json``."""
78+
"""Write the manifest to ``argus-audit.json``.
79+
80+
Every string in the manifest passes through
81+
``mask_secrets_in_obj`` before serialization — defense-in-depth
82+
so a future regression that captures a ``docker_cmd``,
83+
environment dict, or credential-shaped argv into a manifest
84+
field can't leak. Today the manifest schema doesn't include
85+
such fields; the redaction pass is here so it stays that way
86+
regardless of contributor vigilance.
87+
"""
88+
from argus.audit.secrets import mask_secrets_in_obj
89+
7990
dest = Path(output_dir)
8091
dest.mkdir(parents=True, exist_ok=True)
8192
filepath = dest / "argus-audit.json"
8293
filepath.write_text(
83-
json.dumps(asdict(self), indent=2, default=str),
94+
json.dumps(
95+
mask_secrets_in_obj(asdict(self)),
96+
indent=2, default=str,
97+
),
8498
encoding="utf-8",
8599
)
86100
return filepath

argus/audit/secrets.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,36 @@ def mask_secrets(message: str) -> str:
5555
for pattern, replacement in _PATTERNS:
5656
message = pattern.sub(replacement, message)
5757
return message
58+
59+
60+
def mask_secrets_in_obj(obj):
61+
"""Recursively apply ``mask_secrets`` to every string in ``obj``.
62+
63+
Walks dicts, lists, and tuples; leaves non-string scalars and
64+
unknown types untouched. Returns a new structure (does NOT mutate
65+
the input) so the original objects remain available to callers
66+
that haven't fully migrated to the masked view.
67+
68+
The defense-in-depth use case: callers serialize structured data
69+
(audit manifests, JSON log entries, anything else that could end
70+
up on disk) through this walker before writing. Even if a future
71+
contributor accidentally feeds a credential into a manifest field
72+
or a logger.info(..., real_secret) call, the pattern set in
73+
``_PATTERNS`` catches the value before it reaches the filesystem.
74+
75+
Dict keys are not masked — a key that is itself a secret is an
76+
extreme outlier in practice and key-masking would force every
77+
JSON consumer to re-derive the key set. Values cover the realistic
78+
leak surface.
79+
"""
80+
if isinstance(obj, str):
81+
return mask_secrets(obj)
82+
if isinstance(obj, dict):
83+
return {k: mask_secrets_in_obj(v) for k, v in obj.items()}
84+
if isinstance(obj, list):
85+
return [mask_secrets_in_obj(v) for v in obj]
86+
if isinstance(obj, tuple):
87+
return tuple(mask_secrets_in_obj(v) for v in obj)
88+
# Scalars (int, float, bool, None) and unknown types pass through
89+
# unchanged; the caller's json.dumps(default=str) handles encoding.
90+
return obj

argus/tests/audit/test_logger.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,68 @@ def test_existing_logger_honors_later_verbose(self):
229229
h for h in logger.handlers if isinstance(h, logging.StreamHandler)
230230
)
231231
assert console_handler.level == logging.DEBUG
232+
233+
234+
class TestJsonLogSecretLeakProtection:
235+
"""End-to-end: planted secrets in log calls never reach the on-disk
236+
argus.log file. Defense-in-depth — closes hardening item (5).
237+
"""
238+
239+
def _read_log_lines(self, output_dir: Path) -> list[str]:
240+
log_path = output_dir / "argus.log"
241+
return log_path.read_text().strip().split("\n")
242+
243+
def test_secret_in_format_string_masked(self, tmp_path):
244+
"""Caller embeds the secret directly in the format string."""
245+
output_dir = tmp_path / "logs"
246+
logger = get_logger("argus.test.leak.fmt", output_dir=output_dir)
247+
logger.info("auth header: Bearer eyJhbGc.secret-payload.signature")
248+
for h in logger.handlers:
249+
h.flush()
250+
251+
for line in self._read_log_lines(output_dir):
252+
assert "eyJhbGc.secret-payload.signature" not in line
253+
254+
def test_secret_in_record_args_masked(self, tmp_path):
255+
"""Caller passes the secret as a printf-style arg.
256+
257+
Pre-fix, the formatter masked ``record.msg`` (the format
258+
string) but ``record.getMessage()`` re-rendered the message
259+
with ``record.args`` interpolated back in — secret bypassed
260+
the mask entirely. The mask now runs on the rendered output.
261+
"""
262+
output_dir = tmp_path / "logs"
263+
logger = get_logger("argus.test.leak.args", output_dir=output_dir)
264+
secret_token = "ghp_abcdef1234567890abcdef1234567890"
265+
logger.info("registry token: %s", secret_token)
266+
for h in logger.handlers:
267+
h.flush()
268+
269+
for line in self._read_log_lines(output_dir):
270+
assert secret_token not in line
271+
272+
def test_secret_in_extra_field_masked(self, tmp_path):
273+
"""Caller passes the secret via the ``extra`` kwarg."""
274+
output_dir = tmp_path / "logs"
275+
logger = get_logger("argus.test.leak.extra", output_dir=output_dir)
276+
logger.info(
277+
"scan starting",
278+
extra={"scanner": "scanner-with-AKIA1234567890ABCDEF-creds"},
279+
)
280+
for h in logger.handlers:
281+
h.flush()
282+
283+
for line in self._read_log_lines(output_dir):
284+
assert "AKIA1234567890ABCDEF" not in line
285+
286+
def test_non_secret_strings_pass_through_unchanged(self, tmp_path):
287+
"""Confirm we're not masking ordinary strings."""
288+
output_dir = tmp_path / "logs"
289+
logger = get_logger("argus.test.leak.clean", output_dir=output_dir)
290+
logger.info("scanning %s with %d workers", "argus.yml", 4)
291+
for h in logger.handlers:
292+
h.flush()
293+
294+
line = self._read_log_lines(output_dir)[0]
295+
assert "argus.yml" in line
296+
assert "4 workers" in line

argus/tests/audit/test_manifest.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,92 @@ def test_no_summary_leaves_empty(self, tmp_path):
190190
finalize_manifest(m, summary=None, output_dir=tmp_path)
191191
assert m.findings_summary == {}
192192
assert m.scanners_executed == []
193+
194+
195+
class TestManifestSecretLeakProtection:
196+
"""End-to-end: planted secrets in manifest fields never reach disk.
197+
198+
The manifest schema today doesn't include credential fields. This
199+
test guards against a future regression that adds one (a
200+
``docker_cmd`` capture, an env dict, error messages from phases
201+
that quote the failing command) — the redaction pass at save()
202+
time catches it before the file lands.
203+
"""
204+
205+
def test_secret_in_phase_error_redacted(self, tmp_path):
206+
m = AuditManifest(scan_id="leak-test", argus_version="1.0.0")
207+
# Future regression: phase capture includes the failing
208+
# subprocess argv, which includes a credential.
209+
m.phases.append({
210+
"name": "container_pull",
211+
"status": "failure",
212+
"error": "docker login failed with token=ghp_secret_1234567890abcdef",
213+
})
214+
215+
filepath = m.save(tmp_path)
216+
content = filepath.read_text()
217+
218+
assert "ghp_secret_1234567890abcdef" not in content
219+
# Phase metadata that isn't a secret survives
220+
assert "container_pull" in content
221+
assert "failure" in content
222+
223+
def test_secret_in_artifact_path_redacted(self, tmp_path):
224+
m = AuditManifest(scan_id="leak-test", argus_version="1.0.0")
225+
# Pathological: an artifact path that embedded a token
226+
# (e.g., a URL-shaped path with creds in it).
227+
m.artifacts.append({
228+
"name": "remote-fetch.log",
229+
"path": "https://user:AKIA1234567890ABCDEF@bucket.s3.amazonaws.com/x",
230+
})
231+
232+
filepath = m.save(tmp_path)
233+
content = filepath.read_text()
234+
235+
assert "AKIA1234567890ABCDEF" not in content
236+
237+
def test_nested_dict_redacted(self, tmp_path):
238+
"""Secret several levels deep — walker must recurse."""
239+
m = AuditManifest(scan_id="leak-test", argus_version="1.0.0")
240+
m.phases.append({
241+
"name": "auth",
242+
"env_snapshot": {
243+
"credentials": {
244+
"registry": "Bearer eyJhbGc.realtoken.signature",
245+
"non_secret": "/usr/bin",
246+
},
247+
},
248+
})
249+
250+
filepath = m.save(tmp_path)
251+
content = filepath.read_text()
252+
253+
assert "eyJhbGc.realtoken.signature" not in content
254+
# Non-secret nested value survives
255+
assert "/usr/bin" in content
256+
257+
def test_input_dict_not_mutated_after_save(self, tmp_path):
258+
"""Caller's manifest object stays unmodified after save()."""
259+
m = AuditManifest(scan_id="leak-test", argus_version="1.0.0")
260+
m.phases.append({
261+
"name": "p",
262+
"error": "token=ghp_aaaa1111bbbb2222cccc3333dddd4444",
263+
})
264+
original_error = m.phases[0]["error"]
265+
266+
m.save(tmp_path)
267+
268+
# In-memory view of the manifest unchanged — the redaction
269+
# only affects what hits disk.
270+
assert m.phases[0]["error"] == original_error
271+
272+
def test_clean_manifest_writes_no_redaction_tokens(self, tmp_path):
273+
"""A manifest with zero secret-shaped data has no false-positive
274+
``<REDACTED>`` markers."""
275+
m = AuditManifest(scan_id="clean-test", argus_version="1.0.0")
276+
m.phases.append({"name": "init", "status": "success"})
277+
278+
filepath = m.save(tmp_path)
279+
content = filepath.read_text()
280+
281+
assert "<REDACTED>" not in content

argus/tests/audit/test_secrets.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,104 @@ def test_case_insensitive_token(self):
8383
def test_case_insensitive_bearer(self):
8484
result = mask_secrets("bearer abc123def456ghi789jkl")
8585
assert "abc123def456ghi789jkl" not in result
86+
87+
88+
class TestMaskSecretsInObj:
89+
"""Recursive walker — defense-in-depth for audit-trail writes."""
90+
91+
def test_masks_string_at_root(self):
92+
from argus.audit.secrets import mask_secrets_in_obj
93+
result = mask_secrets_in_obj("token=ghp_supersecret123456789")
94+
assert REDACTED in result
95+
assert "ghp_supersecret" not in result
96+
97+
def test_masks_string_in_dict_value(self):
98+
from argus.audit.secrets import mask_secrets_in_obj
99+
result = mask_secrets_in_obj({
100+
"config_path": "argus.yml",
101+
"auth_header": "Bearer eyJhbGc.signature",
102+
})
103+
assert result["config_path"] == "argus.yml"
104+
assert REDACTED in result["auth_header"]
105+
assert "eyJhbGc" not in result["auth_header"]
106+
107+
def test_masks_nested_dict(self):
108+
from argus.audit.secrets import mask_secrets_in_obj
109+
result = mask_secrets_in_obj({
110+
"phase": "scan",
111+
"env": {
112+
"REGISTRY_TOKEN": "AKIA1234567890ABCDEF",
113+
"PATH": "/usr/bin",
114+
},
115+
})
116+
assert REDACTED in result["env"]["REGISTRY_TOKEN"]
117+
assert result["env"]["PATH"] == "/usr/bin"
118+
assert result["phase"] == "scan"
119+
120+
def test_masks_list_of_strings(self):
121+
from argus.audit.secrets import mask_secrets_in_obj
122+
result = mask_secrets_in_obj([
123+
"docker run",
124+
"password=hunter2",
125+
"myapp:latest",
126+
])
127+
assert result[0] == "docker run"
128+
assert REDACTED in result[1]
129+
assert "hunter2" not in result[1]
130+
assert result[2] == "myapp:latest"
131+
132+
def test_masks_through_tuple(self):
133+
from argus.audit.secrets import mask_secrets_in_obj
134+
result = mask_secrets_in_obj(("normal", "token=sk-abc123def456ghi789"))
135+
assert isinstance(result, tuple)
136+
assert result[0] == "normal"
137+
assert REDACTED in result[1]
138+
139+
def test_scalars_pass_through(self):
140+
from argus.audit.secrets import mask_secrets_in_obj
141+
assert mask_secrets_in_obj(42) == 42
142+
assert mask_secrets_in_obj(3.14) == 3.14
143+
assert mask_secrets_in_obj(True) is True
144+
assert mask_secrets_in_obj(None) is None
145+
146+
def test_does_not_mutate_input(self):
147+
from argus.audit.secrets import mask_secrets_in_obj
148+
original = {"creds": {"token": "ghp_supersecret123456"}}
149+
result = mask_secrets_in_obj(original)
150+
# Caller's original is untouched
151+
assert original["creds"]["token"] == "ghp_supersecret123456"
152+
# Returned copy is masked
153+
assert "ghp_supersecret" not in result["creds"]["token"]
154+
155+
def test_deeply_nested_mix(self):
156+
"""Realistic shape: dict of lists of dicts of strings."""
157+
from argus.audit.secrets import mask_secrets_in_obj
158+
result = mask_secrets_in_obj({
159+
"phases": [
160+
{"name": "init", "command": ["argus", "scan"]},
161+
{"name": "auth", "command": ["docker", "login", "-p",
162+
"ghp_secret_1234567890abcdef"]},
163+
],
164+
})
165+
# Non-secret strings preserved
166+
assert result["phases"][0]["command"] == ["argus", "scan"]
167+
# Secret-shaped string masked even at depth 4
168+
assert "ghp_secret" not in result["phases"][1]["command"][3]
169+
assert REDACTED in result["phases"][1]["command"][3]
170+
171+
def test_dict_keys_not_masked(self):
172+
"""Keys pass through unchanged — masking them would break consumers."""
173+
from argus.audit.secrets import mask_secrets_in_obj
174+
result = mask_secrets_in_obj({"ghp_keyname": "regular value"})
175+
assert "ghp_keyname" in result # key intact
176+
assert result["ghp_keyname"] == "regular value"
177+
178+
def test_unknown_object_passes_through(self):
179+
"""Custom types we don't recognize pass through unchanged."""
180+
from argus.audit.secrets import mask_secrets_in_obj
181+
182+
class Custom:
183+
pass
184+
185+
obj = Custom()
186+
assert mask_secrets_in_obj(obj) is obj

0 commit comments

Comments
 (0)