forked from corazawaf/libcoraza
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_get.py
More file actions
381 lines (309 loc) · 14.8 KB
/
simple_get.py
File metadata and controls
381 lines (309 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
#!/usr/bin/env python3
"""simple_get.py - Python SWIG example for libcoraza.
Exercises all exported functions that can be tested without a native
callback (coraza_add_debug_log_callback / coraza_add_error_callback
require language-specific function-pointer support and are excluded from
the default SWIG wrapper).
Build and run from the repository root::
make
make -C examples/python
make -C examples/python run
"""
import os
import sys
import tempfile
# Ensure the SWIG-generated coraza.py / _coraza.so are found.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import coraza as _c # noqa: E402
_DENY_RULE = (
'SecRule REMOTE_ADDR "@ipMatch 127.0.0.1" '
'"id:1,phase:1,deny,log,msg:\'block\',status:403"'
)
_PASS_RULE = (
'SecRule REMOTE_ADDR "@ipMatch 10.0.0.1" '
'"id:2,phase:1,pass,log,msg:\'allow\'"'
)
def _check(cond, msg):
if not cond:
raise AssertionError(msg)
# ---------------------------------------------------------------------------
# test_lifecycle
# Covers: coraza_new_waf_config, coraza_rules_add, coraza_rules_add_file,
# coraza_new_waf, coraza_rules_count, coraza_free_waf_config,
# coraza_new_transaction_with_id, coraza_process_connection,
# coraza_add_request_header, coraza_add_get_args,
# coraza_process_uri, coraza_process_request_headers,
# coraza_append_request_body, coraza_process_request_body,
# coraza_process_response_headers, coraza_add_response_header,
# coraza_append_response_body, coraza_process_response_body,
# coraza_update_status_code, coraza_process_logging,
# coraza_intervention, coraza_free_intervention,
# coraza_free_transaction, coraza_new_transaction, coraza_free_waf
# ---------------------------------------------------------------------------
def test_lifecycle():
# coraza_new_waf_config
cfg = _c.coraza_new_waf_config()
_check(cfg != 0, "coraza_new_waf_config returned 0")
# coraza_rules_add
ret = _c.coraza_rules_add(cfg, _DENY_RULE)
_check(ret == 0, f"coraza_rules_add failed: {ret}")
# coraza_rules_add_file — write a second rule to a temp file
with tempfile.NamedTemporaryFile(
mode="w", suffix=".conf", delete=False
) as tf:
tf.write(_PASS_RULE + "\n")
rules_file = tf.name
try:
ret = _c.coraza_rules_add_file(cfg, rules_file)
_check(ret == 0, f"coraza_rules_add_file failed: {ret}")
# coraza_new_waf — must be created while the rules file still exists
waf = _c.coraza_new_waf(cfg)
_check(waf != 0, "coraza_new_waf returned 0")
finally:
os.unlink(rules_file)
# coraza_rules_count
count = _c.coraza_rules_count(waf)
_check(count >= 2, f"expected >= 2 rules, got {count}")
# coraza_free_waf_config
ret = _c.coraza_free_waf_config(cfg)
_check(ret == 0, f"coraza_free_waf_config failed: {ret}")
# coraza_new_transaction_with_id
tx = _c.coraza_new_transaction_with_id(waf, "python-simple-get")
_check(tx != 0, "coraza_new_transaction_with_id returned 0")
# coraza_process_connection
ret = _c.coraza_process_connection(tx, "127.0.0.1", 55555, "localhost", 80)
_check(ret == 0, f"coraza_process_connection failed: {ret}")
# coraza_add_request_header
# Use len(x.encode()) for byte length — required for correctness with non-ASCII values.
hname, hvalue = "Host", "localhost"
ret = _c.coraza_add_request_header(
tx, hname, len(hname.encode()), hvalue, len(hvalue.encode())
)
_check(ret == 0, f"coraza_add_request_header failed: {ret}")
# coraza_add_get_args
ret = _c.coraza_add_get_args(tx, "foo", "bar")
_check(ret == 0, f"coraza_add_get_args failed: {ret}")
# coraza_process_uri
ret = _c.coraza_process_uri(tx, "/someurl?foo=bar", "GET", "HTTP/1.1")
_check(ret == 0, f"coraza_process_uri failed: {ret}")
# coraza_process_request_headers (returns CORAZA_INTERRUPTION for the deny rule)
ret = _c.coraza_process_request_headers(tx)
_check(ret == 1, f"coraza_process_request_headers: expected CORAZA_INTERRUPTION (1), got {ret}")
# coraza_append_request_body (bytes typemap: single argument)
body = b"hello=world"
ret = _c.coraza_append_request_body(tx, body)
_check(ret == 0, f"coraza_append_request_body failed: {ret}")
# coraza_process_request_body
ret = _c.coraza_process_request_body(tx)
_check(ret >= 0, f"coraza_process_request_body failed: {ret}")
# coraza_process_response_headers
ret = _c.coraza_process_response_headers(tx, 200, "HTTP/1.1")
_check(ret >= 0, f"coraza_process_response_headers failed: {ret}")
# coraza_add_response_header
cname, cvalue = "Content-Type", "text/plain"
ret = _c.coraza_add_response_header(
tx, cname, len(cname.encode()), cvalue, len(cvalue.encode())
)
_check(ret == 0, f"coraza_add_response_header failed: {ret}")
# coraza_append_response_body (bytes typemap: single argument)
resp_body = b"OK"
ret = _c.coraza_append_response_body(tx, resp_body)
_check(ret == 0, f"coraza_append_response_body failed: {ret}")
# coraza_process_response_body
ret = _c.coraza_process_response_body(tx)
_check(ret >= 0, f"coraza_process_response_body failed: {ret}")
# coraza_update_status_code
_c.coraza_update_status_code(tx, 200)
# coraza_process_logging
ret = _c.coraza_process_logging(tx)
_check(ret == 0, f"coraza_process_logging failed: {ret}")
# coraza_intervention — the deny rule on 127.0.0.1 must have fired
it = _c.coraza_intervention(tx)
_check(it is not None, "expected an intervention but got None")
_check(it.status == 403, f"expected status 403, got {it.status}")
print(f" Intervention: action={it.action!r} status={it.status} data={it.data!r}")
# coraza_free_intervention
ret = _c.coraza_free_intervention(it)
_check(ret == 0, f"coraza_free_intervention failed: {ret}")
# coraza_free_transaction
ret = _c.coraza_free_transaction(tx)
_check(ret == 0, f"coraza_free_transaction failed: {ret}")
# coraza_new_transaction (non-ID variant)
tx2 = _c.coraza_new_transaction(waf)
_check(tx2 != 0, "coraza_new_transaction returned 0")
_c.coraza_free_transaction(tx2)
# coraza_free_waf
ret = _c.coraza_free_waf(waf)
_check(ret == 0, f"coraza_free_waf failed: {ret}")
print(" test_lifecycle: PASS")
# ---------------------------------------------------------------------------
# test_request_body_from_file
# Covers: coraza_request_body_from_file
# ---------------------------------------------------------------------------
def test_request_body_from_file():
cfg = _c.coraza_new_waf_config()
_c.coraza_rules_add(cfg, _PASS_RULE)
waf = _c.coraza_new_waf(cfg)
_c.coraza_free_waf_config(cfg)
tx = _c.coraza_new_transaction(waf)
_c.coraza_process_connection(tx, "10.0.0.1", 12345, "localhost", 80)
_c.coraza_process_uri(tx, "/upload", "POST", "HTTP/1.1")
_c.coraza_process_request_headers(tx)
with tempfile.NamedTemporaryFile(delete=False) as tf:
tf.write(b"body content from file")
body_file = tf.name
try:
ret = _c.coraza_request_body_from_file(tx, body_file)
_check(ret == 0, f"coraza_request_body_from_file failed: {ret}")
finally:
os.unlink(body_file)
_check(_c.coraza_process_request_body(tx) == 0, "coraza_process_request_body failed")
_check(_c.coraza_process_response_headers(tx, 200, "HTTP/1.1") == 0, "coraza_process_response_headers failed")
_check(_c.coraza_process_response_body(tx) == 0, "coraza_process_response_body failed")
_check(_c.coraza_process_logging(tx) == 0, "coraza_process_logging failed")
_check(_c.coraza_free_transaction(tx) == 0, "coraza_free_transaction failed")
_check(_c.coraza_free_waf(waf) == 0, "coraza_free_waf failed")
print(" test_request_body_from_file: PASS")
# ---------------------------------------------------------------------------
# test_rules_merge
# Covers: coraza_rules_merge
# NOTE: coraza_rules_merge is currently a stub (always returns 0, does not
# actually merge rules). This test only verifies the call does not crash.
# ---------------------------------------------------------------------------
def test_rules_merge():
cfg1 = _c.coraza_new_waf_config()
_c.coraza_rules_add(cfg1, _PASS_RULE)
waf1 = _c.coraza_new_waf(cfg1)
_check(_c.coraza_free_waf_config(cfg1) == 0, "coraza_free_waf_config failed")
cfg2 = _c.coraza_new_waf_config()
waf2 = _c.coraza_new_waf(cfg2)
_check(_c.coraza_free_waf_config(cfg2) == 0, "coraza_free_waf_config failed")
ret = _c.coraza_rules_merge(waf1, waf2)
_check(ret == 0, f"coraza_rules_merge failed: {ret}")
_check(_c.coraza_free_waf(waf1) == 0, "coraza_free_waf(waf1) failed")
_check(_c.coraza_free_waf(waf2) == 0, "coraza_free_waf(waf2) failed")
print(" test_rules_merge: PASS")
# ---------------------------------------------------------------------------
# test_callbacks
# Covers: coraza_set_error_callback, coraza_set_debug_log_callback,
# coraza_matched_rule_get_error_log, coraza_matched_rule_get_severity
# ---------------------------------------------------------------------------
def test_callbacks():
matched_logs = []
debug_msgs = []
def on_error(rule_handle):
log = _c.coraza_matched_rule_get_error_log(rule_handle)
sev = _c.coraza_matched_rule_get_severity(rule_handle)
matched_logs.append((sev, log))
def on_debug_log(level, message, fields):
debug_msgs.append((level, message))
cfg = _c.coraza_new_waf_config()
_c.coraza_rules_add(cfg, _DENY_RULE)
_check(_c.coraza_set_error_callback(cfg, on_error) == 0,
"coraza_set_error_callback failed")
_check(_c.coraza_set_debug_log_callback(cfg, on_debug_log) == 0,
"coraza_set_debug_log_callback failed")
waf = _c.coraza_new_waf(cfg)
_check(_c.coraza_free_waf_config(cfg) == 0, "coraza_free_waf_config failed")
tx = _c.coraza_new_transaction(waf)
_c.coraza_process_connection(tx, "127.0.0.1", 12345, "localhost", 80)
_c.coraza_process_uri(tx, "/test", "GET", "HTTP/1.1")
_c.coraza_process_request_headers(tx)
_c.coraza_process_logging(tx)
_check(_c.coraza_free_transaction(tx) == 0, "coraza_free_transaction failed")
_check(_c.coraza_free_waf(waf) == 0, "coraza_free_waf failed")
_check(len(matched_logs) > 0,
"expected at least one matched rule via error callback")
print(f" Matched rules via callback: {matched_logs}")
# Debug messages depend on Coraza's internal log level (default: ERROR).
# We verify the callback was accepted without asserting message count.
print(f" Debug messages received: {len(debug_msgs)}")
print(" test_callbacks: PASS")
# ---------------------------------------------------------------------------
# test_intervention_redirect
# Covers: coraza_intervention .data field (redirect URL), coraza_free_intervention
# Validates the char *data struct field that was previously missing from coraza.i.
# ---------------------------------------------------------------------------
def test_intervention_redirect():
redirect_rule = (
'SecRule ARGS:trigger "@streq yes" '
'"id:10,phase:1,status:302,redirect:http://example.com"'
)
cfg = _c.coraza_new_waf_config()
_c.coraza_rules_add(cfg, redirect_rule)
waf = _c.coraza_new_waf(cfg)
_check(_c.coraza_free_waf_config(cfg) == 0, "coraza_free_waf_config failed")
tx = _c.coraza_new_transaction(waf)
_c.coraza_process_connection(tx, "10.0.0.1", 12345, "localhost", 80)
_c.coraza_add_get_args(tx, "trigger", "yes")
_c.coraza_process_uri(tx, "/?trigger=yes", "GET", "HTTP/1.1")
_c.coraza_process_request_headers(tx)
it = _c.coraza_intervention(tx)
_check(it is not None, "expected a redirect intervention but got None")
_check(it.status == 302, f"expected status 302, got {it.status}")
_check(it.action == "redirect", f"expected action 'redirect', got {it.action!r}")
_check(it.data == "http://example.com",
f"expected data 'http://example.com', got {it.data!r}")
_check(_c.coraza_free_intervention(it) == 0, "coraza_free_intervention failed")
_check(_c.coraza_free_transaction(tx) == 0, "coraza_free_transaction failed")
_check(_c.coraza_free_waf(waf) == 0, "coraza_free_waf failed")
print(" test_intervention_redirect: PASS")
# ---------------------------------------------------------------------------
# test_waf_creation_error
# Covers: coraza_new_waf char**er typemap — bad config raises RuntimeError.
# ---------------------------------------------------------------------------
def test_waf_creation_error():
cfg = _c.coraza_new_waf_config()
# Reference a non-existent rules file to force WAF creation to fail.
_c.coraza_rules_add_file(cfg, "/nonexistent/path/rules.conf")
try:
_c.coraza_new_waf(cfg)
raise AssertionError("expected RuntimeError from coraza_new_waf but none was raised")
except RuntimeError:
pass
_check(_c.coraza_free_waf_config(cfg) == 0, "coraza_free_waf_config failed")
print(" test_waf_creation_error: PASS")
# ---------------------------------------------------------------------------
# test_invalid_inputs
# Covers: Python typemap validation and PyCallable_Check hardening.
# ---------------------------------------------------------------------------
def test_invalid_inputs():
# coraza_append_request_body must reject non-bytes input.
cfg = _c.coraza_new_waf_config()
waf = _c.coraza_new_waf(cfg)
_c.coraza_free_waf_config(cfg)
tx = _c.coraza_new_transaction(waf)
try:
_c.coraza_append_request_body(tx, "not bytes")
raise AssertionError("expected TypeError for str input but none was raised")
except TypeError:
pass
_c.coraza_free_transaction(tx)
_c.coraza_free_waf(waf)
# coraza_set_error_callback must reject non-callables.
cfg = _c.coraza_new_waf_config()
try:
_c.coraza_set_error_callback(cfg, "not a callable")
raise AssertionError("expected TypeError for non-callable but none was raised")
except TypeError:
pass
_check(_c.coraza_free_waf_config(cfg) == 0, "coraza_free_waf_config failed")
# coraza_set_debug_log_callback must reject non-callables.
cfg = _c.coraza_new_waf_config()
try:
_c.coraza_set_debug_log_callback(cfg, 42)
raise AssertionError("expected TypeError for non-callable but none was raised")
except TypeError:
pass
_check(_c.coraza_free_waf_config(cfg) == 0, "coraza_free_waf_config failed")
print(" test_invalid_inputs: PASS")
if __name__ == "__main__":
print("Running libcoraza Python SWIG tests...")
test_lifecycle()
test_request_body_from_file()
test_rules_merge()
test_callbacks()
test_intervention_redirect()
test_waf_creation_error()
test_invalid_inputs()
print("All tests passed.")