-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcurl_compare.py
More file actions
executable file
·269 lines (239 loc) · 10.5 KB
/
Copy pathcurl_compare.py
File metadata and controls
executable file
·269 lines (239 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#!/usr/bin/env python3
"""
Independent curl oracle for the generated request-validation suite.
For each generated negative test it re-issues the SAME request with curl
(method, URL, headers, body reconstructed from the emitted .spec.ts) and
compares per test:
expected — the status the generator asserts (assertResponseStatus arg)
playwright — the result Playwright observed (from its JSON report, optional)
curl — the status curl observes now
Mismatches are flagged; with --show-body the curl response body is printed for
any test whose curl status != expected. Exits non-zero on any mismatch.
It does NOT import the suite's own code — a true cross-check oracle. To stay
faithful to the suite it does, however, reconstruct the URL by running the
EXACT `buildUrl()` implementation from the support module in node (so path
params, the 3-arg `buildUrl(path, params, query)` form, and `encodeURIComponent`
query encoding all match), and normalises JS object/array literals via node too.
"""
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
TEST_RE = re.compile(r"test\(\s*(['\"])(?P<title>.*?)\1\s*,", re.S)
# Exact copy of request-validation support/http.ts buildUrl (API_VERSION = 'v2').
BUILD_URL_JS = r"""
const base = process.argv[1];
const API_VERSION = process.argv[2];
function buildUrl(pathTemplate, params, query) {
let url = `${base}/${API_VERSION}${pathTemplate}`.replace(/\{(\w+)}/g, (_, k) => {
const v = params && params[k];
return v == null ? "__MISSING_PARAM__" : String(v);
});
if (query) {
const q = Object.entries(query)
.filter(([, v]) => v !== undefined)
.map(([k, v]) => `${encodeURIComponent(k)}=${encodeURIComponent(String(v))}`)
.join("&");
if (q) url += (url.includes("?") ? "&" : "?") + q;
}
return url;
}
process.stdout.write(buildUrl(__ARGS__));
"""
def node(script: str, *args: str):
# For `node -e`, the first positional is process.argv[1] (no script-name slot
# to skip), so pass args straight through — BUILD_URL_JS reads argv[1]/argv[2].
try:
out = subprocess.run(["node", "-e", script, *args],
capture_output=True, text=True, timeout=15)
return (out.stdout if out.returncode == 0 else None)
except Exception:
return None
def node_json(js_literal: str):
"""JS object/array literal -> parsed Python value (via node)."""
out = node(f"process.stdout.write(JSON.stringify(({js_literal})))")
if out is None:
return None
try:
return json.loads(out)
except Exception:
return None
def extract_balanced(src: str, start: int, open_ch: str, close_ch: str) -> str:
"""Return the substring between the delimiter at `start` and its match,
respecting ' and " string literals. `start` is the index of `open_ch`."""
depth, i, n = 0, start, len(src)
quote = None
while i < n:
c = src[i]
if quote:
if c == "\\":
i += 2
continue
if c == quote:
quote = None
elif c in "'\"":
quote = c
elif c == open_ch:
depth += 1
elif c == close_ch:
depth -= 1
if depth == 0:
return src[start + 1:i]
i += 1
return ""
def split_tests(src: str):
starts = [(m.start(), m.group("title")) for m in TEST_RE.finditer(src)]
for i, (pos, title) in enumerate(starts):
end = starts[i + 1][0] if i + 1 < len(starts) else len(src)
yield title, src[pos:end]
def parse_block(block: str, base: str, api_version: str):
# --- URL: run the suite's real buildUrl with the emitted args ---
bidx = block.find("buildUrl(")
if bidx == -1:
return None
args = extract_balanced(block, block.index("(", bidx), "(", ")").strip()
url = node(BUILD_URL_JS.replace("__ARGS__", args), base, api_version)
if url is None:
return None
# --- method ---
mm = re.search(r"request\.(get|post|put|patch|delete)\(", block)
method = mm.group(1).upper() if mm else "GET"
# --- headers helper / literal ---
hm = re.search(r"headers:\s*([^\n]+?),?\n", block)
headers_kind = hm.group(1).strip() if hm else "{}"
# --- multipart vs json body ---
multipart = None
body_json = None
if "multipart: formData" in block:
fm = re.search(r"multipartFields[^=]*=\s*", block)
if fm:
obj = extract_balanced(block, block.index("{", fm.end()), "{", "}")
multipart = node_json("{" + obj + "}")
else:
bm = re.search(r"const requestBody[^=]*=\s*", block)
if bm:
j = bm.end()
# literal starts at next { or [
br = min((p for p in (block.find("{", j), block.find("[", j)) if p != -1), default=-1)
if br != -1:
lit = extract_balanced(block, br, block[br], "}" if block[br] == "{" else "]")
body_json = node(f"process.stdout.write(JSON.stringify(({block[br]}{lit}{'}' if block[br]=='{' else ']'})))")
# --- expected status + metadata (quote-agnostic) ---
am = re.search(r"assertResponseStatus\(\s*testInfo,\s*res,\s*(\d{3})", block)
expected = int(am.group(1)) if am else None
op = re.search(r"operationId:\s*['\"]([^'\"]+)['\"]", block)
kind = re.search(r"scenarioKind:\s*['\"]([^'\"]+)['\"]", block)
return {
"url": url, "method": method, "headers_kind": headers_kind,
"multipart": multipart, "body_json": body_json, "expected": expected,
"operationId": op.group(1) if op else "", "kind": kind.group(1) if kind else "",
}
def curl_headers(kind, admin_header, deny_header):
k = kind.strip()
if k.startswith("jsonHeaders"):
return (["Content-Type: application/json"] + ([admin_header] if admin_header else []))
if k.startswith("authHeaders"):
return [admin_header] if admin_header else []
if k.startswith("denyProbeHeaders"):
return [deny_header] if deny_header else []
if "Bearer invalid-token" in k:
return ["Authorization: Bearer invalid-token"]
return [] # {} → no auth
def run_curl(method, url, headers, body_json, multipart):
cmd = ["curl", "-s", "-o", "-", "-w", "\n__HTTP__%{http_code}", "-X", method, url]
for h in headers:
cmd += ["-H", h]
if multipart is not None:
for k, v in multipart.items():
cmd += ["-F", f"{k}={v}"]
elif body_json is not None:
cmd += ["--data-binary", body_json]
try:
out = subprocess.run(cmd, capture_output=True, text=True, timeout=30).stdout
except Exception as e:
return None, f"<curl error: {e}>"
marker = out.rfind("__HTTP__")
if marker == -1:
return None, out
return int(out[marker + len("__HTTP__"):].strip() or 0), out[:marker]
def load_pw(pw_json):
res = {}
if not pw_json:
return res
try:
d = json.load(open(pw_json))
except Exception:
return res
def walk(suites):
for s in suites:
for sp in s.get("specs", []):
rec = None
for t in sp.get("tests", []):
for r in t.get("results", []):
for e in (r.get("errors") or []):
mm = re.search(r"Received:.*?(\d{3})", e.get("message", ""))
if mm:
rec = int(mm.group(1))
res[sp["title"]] = {"ok": sp.get("ok"), "received": rec}
walk(s.get("suites", []))
walk(d.get("suites", []))
return res
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--spec-dir", required=True)
ap.add_argument("--base-url", required=True)
ap.add_argument("--api-version", default="v2")
ap.add_argument("--admin-header", default="")
ap.add_argument("--deny-header", default="")
ap.add_argument("--pw-json", default="")
ap.add_argument("--show-body", action="store_true")
ap.add_argument("--max-body", type=int, default=400)
args = ap.parse_args()
pw = load_pw(args.pw_json)
specs = sorted(Path(args.spec_dir).glob("*-validation-api-tests.spec.ts"))
rows, mismatches, skipped = [], [], 0
for spec in specs:
src = spec.read_text()
for title, block in split_tests(src):
d = parse_block(block, args.base_url, args.api_version)
if not d or d["expected"] is None:
skipped += 1
continue
# Deny (auth-deny / 403) scenarios need the probe principal's header.
# Without --deny-header we'd re-issue them unauthenticated and get a
# bogus 401-vs-403 "mismatch", so skip them explicitly instead.
if d["headers_kind"].strip().startswith("denyProbeHeaders") and not args.deny_header:
skipped += 1
continue
headers = curl_headers(d["headers_kind"], args.admin_header, args.deny_header)
code, body = run_curl(d["method"], d["url"], headers, d["body_json"], d["multipart"])
pw_rec = pw.get(title, {})
pw_status = ("pass" if pw_rec.get("ok") else f"FAIL({pw_rec.get('received')})") if pw_rec else "—"
match = code == d["expected"]
rows.append((title, d["expected"], pw_status, code, match))
if not match:
mismatches.append((title, d["method"], d["url"], d["expected"], code, body))
print(f"\n{'TEST':<58} {'EXP':>4} {'PW':>10} {'CURL':>5} OK")
print("-" * 88)
for title, exp, pw_status, code, match in rows:
print(f"{title[:58]:<58} {exp:>4} {pw_status:>10} {str(code):>5} {'✓' if match else '✗'}")
total, ok = len(rows), sum(1 for r in rows if r[4])
print("-" * 88)
print(f"curl vs expected: {ok}/{total} match, {total - ok} mismatch" +
(f" ({skipped} unparsed/skipped)" if skipped else ""))
if args.show_body and mismatches:
print("\n=== MISMATCH DETAIL (curl status != expected) ===")
for title, method, url, exp, code, body in mismatches:
print(f"\n• {title}\n {method} {url}\n expected {exp}, curl {code}")
if body:
print(" body:", body.strip()[: args.max_body])
# 0 comparable tests means a broken parser / wrong --spec-dir / everything
# skipped — that's an error, not a silent pass.
if total == 0:
print(f"✗ no comparable tests (parsed 0; {skipped} skipped) — check --spec-dir/auth", file=sys.stderr)
sys.exit(2)
sys.exit(1 if (total - ok) else 0)
if __name__ == "__main__":
main()