1
- """Injection makes use of plugins."""
1
+ """Injectors exercise plugins."""
2
2
3
3
import os
4
4
import re
18
18
)
19
19
20
20
21
+ # pylint: disable-next=too-few-public-methods
22
+ class TowerNamespace :
23
+ """Dummy class."""
24
+
25
+
26
+ TowerNamespaceValueType = TowerNamespace | GenericOptionalPrimitiveType
27
+ ExtraVarsType = dict [str , str ] | list [str ] | str
28
+
29
+
21
30
HIDDEN_PASSWORD = '*' * 10
22
31
SENSITIVE_ENV_VAR_NAMES = 'API|TOKEN|KEY|SECRET|PASS'
23
32
@@ -61,7 +70,7 @@ def build_safe_env(
61
70
:returns: Sanitized environment variables.
62
71
"""
63
72
safe_env = dict (env )
64
- for env_k , env_val in safe_env .items ():
73
+ for env_k , env_v in safe_env .items ():
65
74
is_special = (
66
75
env_k == 'AWS_ACCESS_KEY_ID'
67
76
or (
@@ -72,23 +81,29 @@ def build_safe_env(
72
81
)
73
82
if is_special :
74
83
continue
75
- elif HIDDEN_PASSWORD_RE .search (env_k ):
84
+ if HIDDEN_PASSWORD_RE .search (env_k ):
76
85
safe_env [env_k ] = HIDDEN_PASSWORD
77
- elif isinstance (env_val , str ) and HIDDEN_URL_PASSWORD_RE .match (env_val ):
86
+ elif isinstance (env_v , str ) and HIDDEN_URL_PASSWORD_RE .match (env_v ):
78
87
safe_env [env_k ] = HIDDEN_URL_PASSWORD_RE .sub (
79
- HIDDEN_PASSWORD , env_val ,
88
+ HIDDEN_PASSWORD , env_v ,
80
89
)
81
90
return safe_env
82
91
83
92
84
93
def secret_fields (cred_type : ManagedCredentialType ) -> list [str ]:
94
+ """List of fields that are sensitive from the credential type.
95
+
96
+ :param cred_type: Where the secret field descriptions live
97
+ :return: list of secret field names
98
+ """
85
99
return [
86
100
str (field ['id' ])
87
101
for field in cred_type .inputs .get ('fields' , [])
88
102
if field .get ('secret' , False ) is True
89
103
]
90
104
91
105
106
+ # pylint: disable-next=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-branches,too-many-statements
92
107
def inject_credential (
93
108
cred_type : ManagedCredentialType ,
94
109
credential : Credential ,
@@ -97,6 +112,7 @@ def inject_credential(
97
112
args : list [GenericOptionalPrimitiveType ],
98
113
private_data_dir : str ,
99
114
) -> None :
115
+ # pylint: disable=unidiomatic-typecheck
100
116
"""Inject credential data.
101
117
102
118
Inject credential data into the environment variables and
@@ -133,21 +149,19 @@ def inject_credential(
133
149
safe_env .update (build_safe_env (injected_env ))
134
150
return
135
151
136
- class TowerNamespace :
137
- """Dummy class."""
138
-
139
152
tower_namespace = TowerNamespace ()
140
153
141
154
# maintain a normal namespace for building the ansible-playbook
142
155
# arguments (env and args)
143
- namespace : dict [str , TowerNamespace | GenericOptionalPrimitiveType ] = {
156
+ namespace : dict [str , TowerNamespaceValueType ] = {
144
157
'tower' : tower_namespace ,
145
158
}
146
159
147
160
# maintain a sanitized namespace for building the DB-stored arguments
148
161
# (safe_env)
149
- safe_namespace : dict [str , TowerNamespace | GenericOptionalPrimitiveType ] = {
150
- 'tower' : tower_namespace , }
162
+ safe_namespace : dict [str , TowerNamespaceValueType ] = {
163
+ 'tower' : tower_namespace ,
164
+ }
151
165
152
166
# build a normal namespace with secret values decrypted (for
153
167
# ansible-playbook) and a safe namespace with secret values hidden (for
@@ -170,8 +184,9 @@ class TowerNamespace:
170
184
171
185
for field in cred_type .inputs .get ('fields' , []):
172
186
field_id = str (field ['id' ])
187
+ field_type_is_bool = field ['type' ] == 'boolean'
173
188
# default missing boolean fields to False
174
- if field [ 'type' ] == 'boolean' and field_id not in credential .get_input_keys ():
189
+ if field_type_is_bool and field_id not in credential .get_input_keys ():
175
190
namespace [field_id ] = False
176
191
safe_namespace [field_id ] = False
177
192
# make sure private keys end with a \n
@@ -193,20 +208,24 @@ class TowerNamespace:
193
208
** namespace ,
194
209
) # type: ignore[misc]
195
210
env_dir = os .path .join (private_data_dir , 'env' )
196
- _ , path = tempfile .mkstemp (dir = env_dir )
197
- with open (path , 'w' ) as f :
211
+ path = tempfile .mkstemp (dir = env_dir )[ 1 ]
212
+ with open (path , 'w' ) as f : # pylint: disable=unspecified-encoding
198
213
f .write (data )
199
214
os .chmod (path , stat .S_IRUSR | stat .S_IWUSR )
200
215
container_path = get_incontainer_path (path , private_data_dir )
201
216
202
217
# determine if filename indicates single file or many
203
218
if file_label .find ('.' ) == - 1 :
204
- tower_namespace .filename = container_path
219
+ tower_namespace .filename = container_path # pylint: disable=attribute-defined-outside-init
205
220
else :
206
221
if not hasattr (tower_namespace , 'filename' ):
207
- tower_namespace .filename = TowerNamespace ()
222
+ tower_namespace .filename = TowerNamespace (
223
+ ) # pylint: disable=attribute-defined-outside-init
208
224
file_label = file_label .split ('.' )[1 ]
209
- setattr (tower_namespace .filename , file_label , container_path )
225
+ setattr (
226
+ tower_namespace .filename ,
227
+ file_label ,
228
+ container_path ) # pylint: disable=attribute-defined-outside-init
210
229
211
230
for env_var , tmpl in cred_type .injectors .get ('env' , {}).items ():
212
231
if env_var in ENV_BLOCKLIST :
@@ -220,23 +239,25 @@ class TowerNamespace:
220
239
# awx-manage inventory_update does not support extra_vars via -e
221
240
def build_extra_vars (
222
241
node : dict [str , str | list [str ]] | list [str ] | str ,
223
- ) -> dict [ str , str ] | list [ str ] | str :
242
+ ) -> ExtraVarsType :
224
243
if isinstance (node , dict ):
225
244
return {
226
- build_extra_vars (k ): build_extra_vars (v ) for k ,
245
+ build_extra_vars (entry ): build_extra_vars (v ) for entry ,
227
246
v in node .items ()
228
247
}
229
- elif isinstance (node , list ):
230
- return [build_extra_vars (x ) for x in node ]
231
- else :
232
- return sandbox_env .from_string (node ).render (** namespace )
233
-
234
- def build_extra_vars_file (vars , private_dir : str ) -> str :
248
+ if isinstance (node , list ):
249
+ return [build_extra_vars (entry ) for entry in node ]
250
+ return sandbox_env .from_string (node ).render (** namespace )
251
+
252
+ def build_extra_vars_file (
253
+ extra_vars : ExtraVarsType ,
254
+ private_dir : str ,
255
+ ) -> str :
235
256
handle , path = tempfile .mkstemp (
236
257
dir = os .path .join (private_dir , 'env' ),
237
258
)
238
259
f = os .fdopen (handle , 'w' )
239
- f .write (yaml_safe_dump (vars ))
260
+ f .write (yaml_safe_dump (extra_vars ))
240
261
f .close ()
241
262
os .chmod (path , stat .S_IRUSR )
242
263
return path
@@ -249,4 +270,5 @@ def build_extra_vars_file(vars, private_dir: str) -> str:
249
270
if extra_vars :
250
271
path = build_extra_vars_file (extra_vars , private_data_dir )
251
272
container_path = get_incontainer_path (path , private_data_dir )
252
- args .extend (['-e' , '@%s' % container_path ])
273
+ args .extend (['-e' , '@%s' % container_path ]
274
+ ) # pylint: disable=consider-using-f-string
0 commit comments