10
10
_name_filter = re .compile (r"key|token|secret|pass|docker_login" , re .IGNORECASE )
11
11
12
12
13
- def _get_secrets_str () -> tuple [ list [str ], str ]:
13
+ def _get_secrets () -> list [str ]:
14
14
secrets : list = [
15
- value for name , value in os .environ .items () if value and name not in _not_secrets and _name_filter .search (name )
16
- ]
17
- redacted = "<redacted>"
18
- return secrets , redacted
19
-
20
-
21
- def _get_secrets_bytes () -> tuple [list [bytes ], bytes ]:
22
- secrets : list = [
23
- value .encode ()
15
+ value .strip ()
24
16
for name , value in os .environ .items ()
25
- if value and name not in _not_secrets and _name_filter .search (name )
17
+ if value . strip () and name not in _not_secrets and _name_filter .search (name )
26
18
]
27
- redacted = b"<redacted>"
19
+ return secrets
28
20
29
- return secrets , redacted
30
21
31
-
32
- def _instrument_write_methods_str (f ) -> None :
33
- # get list of secrets at each call, because environ may be updated
22
+ def _instrument_write_methods_str (f , secrets : list [str ]) -> None :
34
23
original_write = f .write
35
- secrets , redacted = _get_secrets_str ()
36
24
secret_regex = re .compile ("|" .join (re .escape (s ) for s in secrets ))
37
25
38
26
def write (data ):
39
- data = re .sub (secret_regex , redacted , data )
27
+ data = re .sub (secret_regex , "< redacted>" , data )
40
28
original_write (data )
41
29
42
30
f .write = write
43
31
44
32
45
- def _instrument_write_methods_bytes (f ) -> None :
33
+ def _instrument_write_methods_bytes (f , secrets : list [ str ] ) -> None :
46
34
original_write = f .write
47
- secrets , redacted = _get_secrets_bytes ()
48
- secret_regex = re .compile (b"|" .join (re .escape (s ) for s in secrets ))
35
+ secret_regex = re .compile (b"|" .join (re .escape (s .encode ()) for s in secrets ))
49
36
50
37
def write (data ):
51
- data = re .sub (secret_regex , redacted , data )
38
+ data = re .sub (secret_regex , b"< redacted>" , data )
52
39
original_write (data )
53
40
54
41
f .write = write
@@ -57,11 +44,14 @@ def write(data):
57
44
def _instrumented_open (file , mode = "r" , * args , ** kwargs ): # noqa: ANN002
58
45
f = _original_open (file , mode , * args , ** kwargs )
59
46
60
- if "w" in mode or "a" in mode :
47
+ # get list of secrets at each call, because environ may be updated
48
+ secrets = _get_secrets ()
49
+
50
+ if ("w" in mode or "a" in mode ) and len (secrets ) > 0 :
61
51
if "b" in mode :
62
- _instrument_write_methods_bytes (f )
52
+ _instrument_write_methods_bytes (f , secrets )
63
53
else :
64
- _instrument_write_methods_str (f )
54
+ _instrument_write_methods_str (f , secrets )
65
55
66
56
return f
67
57
0 commit comments