1
1
"""Python client for the Yeti API."""
2
2
3
3
import json
4
+ import logging
4
5
from typing import Any , Sequence
5
6
6
7
import requests
24
25
YetiLinkObject = dict [str , Any ]
25
26
26
27
28
+ logger = logging .getLogger (__name__ )
29
+ handler = logging .StreamHandler ()
30
+ formatter = logging .Formatter ("%(asctime)s - %(levelname)s - %(message)s" )
31
+ handler .setFormatter (formatter )
32
+ logger .addHandler (handler )
33
+
34
+
27
35
class YetiApi :
28
36
"""API object to interact with the Yeti API.
29
37
@@ -40,13 +48,21 @@ def __init__(self, url_root: str):
40
48
}
41
49
self ._url_root = url_root
42
50
51
+ self ._auth_function = ""
52
+ self ._auth_function_map = {
53
+ "auth_api_key" : self .auth_api_key ,
54
+ }
55
+
56
+ self ._apikey = None
57
+
43
58
def do_request (
44
59
self ,
45
60
method : str ,
46
61
url : str ,
47
62
json_data : dict [str , Any ] | None = None ,
48
63
body : bytes | None = None ,
49
64
headers : dict [str , Any ] | None = None ,
65
+ retries : int = 3 ,
50
66
) -> bytes :
51
67
"""Issues a request to the given URL.
52
68
@@ -56,6 +72,7 @@ def do_request(
56
72
json: The JSON payload to include in the request.
57
73
body: The body to include in the request.
58
74
headers: Extra headers to include in the request.
75
+ retries: The number of times to retry the request.
59
76
60
77
Returns:
61
78
The response from the API; a bytes object.
@@ -85,17 +102,30 @@ def do_request(
85
102
raise ValueError (f"Unsupported method: { method } " )
86
103
response .raise_for_status ()
87
104
except requests .exceptions .HTTPError as e :
105
+ if e .response .status_code == 401 :
106
+ if retries == 0 :
107
+ raise errors .YetiAuthError (str (e )) from e
108
+ self .refresh_auth ()
109
+ return self .do_request (
110
+ method , url , json_data , body , headers , retries - 1
111
+ )
112
+
88
113
raise errors .YetiApiError (e .response .status_code , e .response .text )
89
114
90
115
return response .content
91
116
92
- def auth_api_key (self , apikey : str ) -> None :
117
+ def auth_api_key (self , apikey : str | None = None ) -> None :
93
118
"""Authenticates a session using an API key."""
94
119
# Use long-term refresh API token to get an access token
120
+ if apikey is not None :
121
+ self ._apikey = apikey
122
+ if not self ._apikey :
123
+ raise ValueError ("No API key provided." )
124
+
95
125
response = self .do_request (
96
126
"POST" ,
97
127
f"{ self ._url_root } { API_TOKEN_ENDPOINT } " ,
98
- headers = {"x-yeti-apikey" : apikey },
128
+ headers = {"x-yeti-apikey" : self . _apikey },
99
129
)
100
130
101
131
access_token = json .loads (response ).get ("access_token" )
@@ -107,6 +137,14 @@ def auth_api_key(self, apikey: str) -> None:
107
137
authd_session .headers .update ({"authorization" : f"Bearer { access_token } " })
108
138
self .client = authd_session
109
139
140
+ self ._auth_function = "auth_api_key"
141
+
142
+ def refresh_auth (self ):
143
+ if self ._auth_function :
144
+ self ._auth_function_map [self ._auth_function ]()
145
+ else :
146
+ logger .warning ("No auth function set, cannot refresh auth." )
147
+
110
148
def search_indicators (
111
149
self ,
112
150
name : str | None = None ,
@@ -261,7 +299,6 @@ def new_dfiq_from_yaml(self, dfiq_type: str, dfiq_yaml: str) -> YetiObject:
261
299
params = {
262
300
"dfiq_type" : dfiq_type ,
263
301
"dfiq_yaml" : dfiq_yaml ,
264
- "update_indicators" : True ,
265
302
}
266
303
response = self .do_request (
267
304
"POST" , f"{ self ._url_root } /api/v2/dfiq/from_yaml" , json_data = params
@@ -278,13 +315,25 @@ def patch_dfiq_from_yaml(
278
315
params = {
279
316
"dfiq_type" : dfiq_type ,
280
317
"dfiq_yaml" : dfiq_yaml ,
281
- "update_indicators" : True ,
282
318
}
283
319
response = self .do_request (
284
320
"PATCH" , f"{ self ._url_root } /api/v2/dfiq/{ yeti_id } " , json_data = params
285
321
)
286
322
return json .loads (response )
287
323
324
+ def patch_dfiq (self , dfiq_object : dict [str , Any ]) -> YetiObject :
325
+ """Patches a DFIQ object in Yeti."""
326
+ params = {
327
+ "dfiq_type" : dfiq_object ["type" ],
328
+ "dfiq_object" : dfiq_object ,
329
+ }
330
+ response = self .do_request (
331
+ "PATCH" ,
332
+ f"{ self ._url_root } /api/v2/dfiq/{ dfiq_object ['id' ]} " ,
333
+ json_data = params ,
334
+ )
335
+ return json .loads (response )
336
+
288
337
def download_dfiq_archive (self , dfiq_type : str | None = None ) -> bytes :
289
338
"""Downloads an archive containing all DFIQ data from Yeti.
290
339
0 commit comments