-
Notifications
You must be signed in to change notification settings - Fork 315
Expand file tree
/
Copy pathapi.py
More file actions
310 lines (252 loc) · 9.82 KB
/
api.py
File metadata and controls
310 lines (252 loc) · 9.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# pylint: disable=wrong-import-order
import logging
from typing import Mapping, Optional, Sequence, Tuple, Union
from warnings import warn
from packaging.version import Version
from requests import Session
__all__ = [
"ZabbixAPI",
"ZabbixAPIException",
"ZabbixAPIMethod",
"ZabbixAPIObject",
"ZabbixAPIObjectClass",
]
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
ZABBIX_5_4_0 = Version("5.4.0")
ZABBIX_6_4_0 = Version("6.4.0")
class ZabbixAPIException(Exception):
"""Generic Zabbix API exception
Codes:
-32700: invalid JSON. An error occurred on the server while
parsing the JSON text (typo, wrong quotes, etc.)
-32600: received JSON is not a valid JSON-RPC Request
-32601: requested remote-procedure does not exist
-32602: invalid method parameters
-32603: Internal JSON-RPC error
-32400: System error
-32300: Transport error
-32500: Application error
"""
def __init__(self, *args, **kwargs):
super().__init__(*args)
self.error = kwargs.get("error", None)
# pylint: disable=too-many-instance-attributes
class ZabbixAPI:
# pylint: disable=too-many-arguments
def __init__(
self,
server: str = "http://localhost/zabbix",
session: Optional[Session] = None,
use_authenticate: bool = False,
timeout: Optional[Union[float, int, Tuple[int, int]]] = None,
detect_version: bool = True,
):
"""
:param server: Base URI for zabbix web interface (omitting /api_jsonrpc.php)
:param session: optional pre-configured requests.Session instance
:param use_authenticate: Use old (Zabbix 1.8) style authentication
:param timeout: optional connect and read timeout in seconds, default: None
If you're using Requests >= 2.4 you can set it as
tuple: "(connect, read)" which is used to set individual
connect and read timeouts.
:param detect_version: autodetect Zabbix API version
"""
self.session = session or Session()
# Default headers for all requests
self.session.headers.update(
{
"Content-Type": "application/json-rpc",
"User-Agent": "python/pyzabbix",
"Cache-Control": "no-cache",
}
)
self.use_authenticate = use_authenticate
self.use_api_token = False
self.auth = ""
self.id = 0 # pylint: disable=invalid-name
self.timeout = timeout
if not server.endswith("/api_jsonrpc.php"):
server = server.rstrip("/") + "/api_jsonrpc.php"
self.url = server
logger.info("JSON-RPC Server Endpoint: %s", self.url)
self.version: Optional[Version] = None
self._detect_version = detect_version
def __enter__(self) -> "ZabbixAPI":
return self
# pylint: disable=inconsistent-return-statements
def __exit__(self, exception_type, exception_value, traceback):
if isinstance(exception_value, (ZabbixAPIException, type(None))):
if self.is_authenticated and not self.use_api_token:
# Logout the user if they are authenticated using username + password.
self.user.logout()
return True
return None
def login(
self,
user: str = "",
password: str = "",
api_token: Optional[str] = None,
) -> None:
"""Convenience method for calling user.authenticate
and storing the resulting auth token for further commands.
If use_authenticate is set, it uses the older (Zabbix 1.8)
authentication command
:param password: Password used to login into Zabbix
:param user: Username used to login into Zabbix
:param api_token: API Token to authenticate with
"""
if self._detect_version:
self.version = Version(self.api_version())
logger.info("Zabbix API version is: %s", self.version)
# If the API token is explicitly provided, use this instead.
if api_token is not None:
self.use_api_token = True
self.auth = api_token
return
# If we have an invalid auth token, we are not allowed to send a login
# request. Clear it before trying.
self.auth = ""
if self.use_authenticate:
logger.debug("Using legacy authentication with user=%s", user)
self.auth = self.user.authenticate(user=user, password=password)
else:
if self.version and self.version < Version("5.4.0"):
logger.debug("Using user.login with user=%s for Zabbix < 5.4", user)
self.auth = self.user.login(user=user, password=password)
else:
logger.debug("Using user.login with username=%s for Zabbix >= 5.4", user)
self.auth = self.user.login(username=user, password=password)
def check_authentication(self):
if self.use_api_token:
# We cannot use this call using an API Token
return True
# Convenience method for calling user.checkAuthentication of the current session
return self.user.checkAuthentication(sessionid=self.auth)
@property
def is_authenticated(self) -> bool:
if self.use_api_token:
# We cannot use this call using an API Token
return True
try:
self.user.checkAuthentication(sessionid=self.auth)
except ZabbixAPIException:
return False
return True
def confimport(
self,
confformat: str = "",
source: str = "",
rules: str = "",
) -> dict:
"""Alias for configuration.import because it clashes with
Python's import reserved keyword
:param rules:
:param source:
:param confformat:
"""
warn(
"ZabbixAPI.confimport(format, source, rules) has been deprecated, please use "
"ZabbixAPI.configuration['import'](format=format, source=source, rules=rules) instead",
DeprecationWarning,
2,
)
return self.configuration["import"](
format=confformat,
source=source,
rules=rules,
)
def api_version(self) -> str:
return self.apiinfo.version()
def do_request(
self,
method: str,
params: Optional[Union[Mapping, Sequence]] = None,
) -> dict:
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": self.id,
}
headers = {}
# We don't have to pass the auth token if asking for
# the apiinfo.version or user.checkAuthentication
anonymous_methods = {
"apiinfo.version",
"user.checkAuthentication",
"user.login",
}
if self.auth and method not in anonymous_methods:
if self.version and self.version >= ZABBIX_6_4_0:
headers["Authorization"] = f"Bearer {self.auth}"
else:
payload["auth"] = self.auth
logger.debug("Sending: %s", payload)
resp = self.session.post(
self.url,
json=payload,
headers=headers,
timeout=self.timeout,
)
logger.debug("Response Code: %s", resp.status_code)
# NOTE: Getting a 412 response code means the headers are not in the
# list of allowed headers.
resp.raise_for_status()
if not resp.text:
raise ZabbixAPIException("Received empty response")
try:
response = resp.json()
except ValueError as exception:
raise ZabbixAPIException(
f"Unable to parse json: {resp.text}"
) from exception
logger.debug("Response Body: %s", response)
self.id += 1
if "error" in response: # some exception
error = response["error"]
# some errors don't contain 'data': workaround for ZBX-9340
if "data" not in error:
error["data"] = "No data"
raise ZabbixAPIException(
f"Error {error['code']}: {error['message']}, {error['data']}",
error["code"],
error=error,
)
return response
def _object(self, attr: str) -> "ZabbixAPIObject":
"""Dynamically create an object class (ie: host)"""
return ZabbixAPIObject(attr, self)
def __getattr__(self, attr: str) -> "ZabbixAPIObject":
return self._object(attr)
def __getitem__(self, attr: str) -> "ZabbixAPIObject":
return self._object(attr)
# pylint: disable=too-few-public-methods
class ZabbixAPIMethod:
def __init__(self, method: str, parent: ZabbixAPI):
self._method = method
self._parent = parent
def __call__(self, *args, **kwargs):
if args and kwargs:
raise TypeError("Found both args and kwargs")
return self._parent.do_request(self._method, args or kwargs)["result"]
# pylint: disable=too-few-public-methods
class ZabbixAPIObject:
def __init__(self, name: str, parent: ZabbixAPI):
self._name = name
self._parent = parent
def _method(self, attr: str) -> ZabbixAPIMethod:
"""Dynamically create a method (ie: get)"""
return ZabbixAPIMethod(f"{self._name}.{attr}", self._parent)
def __getattr__(self, attr: str) -> ZabbixAPIMethod:
return self._method(attr)
def __getitem__(self, attr: str) -> ZabbixAPIMethod:
return self._method(attr)
class ZabbixAPIObjectClass(ZabbixAPIObject):
def __init__(self, *args, **kwargs):
warn(
"ZabbixAPIObjectClass has been renamed to ZabbixAPIObject",
DeprecationWarning,
2,
)
super().__init__(*args, **kwargs)