-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathload_tester.py
More file actions
342 lines (296 loc) Β· 15.3 KB
/
Copy pathload_tester.py
File metadata and controls
342 lines (296 loc) Β· 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
"""
load_tester.py
Two-cycle load test harness. Measures warm/cold hit rates across
Warmup β Ramp β Spike β Cooldown β Drain β Ramp β Spike phases.
The Ramp phase generates a detectable pre-spike signal so that the
EWMA+CUSUM predictor can proactively scale up BEFORE the spike peak,
demonstrating its advantage over purely reactive scaling.
CSCI 599: Network Systems for Cloud Computing
University of Southern California
--- Traffic Profile (Scenario A: Bursty-Ramp, primary experiment) ---
Cycle 1:
Warmup : 2 RPS x 8 s (predictor learns low baseline: EWMA β 2)
Ramp : 20 RPS x 4 s (pre-spike ramp; CUSUM fires at ~sec 3 β
proactively scales to ~15 workers)
Spike : 30 RPS x 5 s (peak traffic; EWMA+CUSUM workers pre-warmed
before spike; Reactive still catching up)
Cooldown : 2 RPS x 5 s (traffic drops; idle workers start aging)
Drain : 0 RPS x 8 s (all workers idle > 6s β scavenger kills all;
clean slate for Cycle 2)
Cycle 2:
Warmup : 2 RPS x 35 s (long warmup: EWMA decays from ~15 back to ~2.3,
restoring CUSUM sensitivity. 35s β 17 windows Γ 0.8^17
brings EWMA from 15 β 2.3; same trigger math as C1)
Ramp : 20 RPS x 4 s (CUSUM fires again; pre-warms for spike)
Spike : 30 RPS x 5 s (verify C2 is as good as C1 with fully decayed EWMA)
Expected observations:
C1-Spike β EWMA+CUSUM: 0 cold starts (CUSUM fires at ramp window 2, pre-warms 15 workers)
Reactive: ~20 cold starts (no look-ahead, scales only after seeing spike)
C2-Spike β EWMA+CUSUM: 0 cold starts (35s warmup decays EWMA to ~2.3, CUSUM fires again)
Reactive: ~20 cold starts (same as C1, no pre-warming)
Key comparison (short vs long interval):
Short C2 warmup (5s): EWMA stays ~6 β CUSUM blind β cold=32 (worse than Reactive)
Long C2 warmup (35s): EWMA decays to ~2.3 β CUSUM fires β coldβ0 (matches C1)
Run modes:
./server EWMA+CUSUM (our method)
./server reactive Reactive baseline
./server static 5 Static baseline (fixed N workers)
"""
import argparse
import socket
import time
import threading
import os
from datetime import datetime
HOST = '127.0.0.1'
PORT = 8080
# Minimal 1x1 PNG (Base64)
IMAGE_B64 = (
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwC"
"AAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII="
)
HTTP_REQUEST = (
f"POST /invoke HTTP/1.1\r\n"
f"Host: {HOST}:{PORT}\r\n"
f"Content-Length: {len(IMAGE_B64)}\r\n"
f"Content-Type: text/plain\r\n"
f"Connection: close\r\n\r\n"
f"{IMAGE_B64}"
).encode('utf-8')
# ββ Shared result storage ββββββββββββββββββββββββββββββββββββββββββββββββββββ
_lock = threading.Lock()
_results = [] # list of dicts: {phase, rtt_ms, cold, timeout, error}
_log_lines = []
def _record(phase, rtt_ms=None, cold=False, timeout=False, error=False, note=""):
with _lock:
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
entry = {"phase": phase}
if rtt_ms is not None:
entry.update({"rtt_ms": rtt_ms, "cold": cold})
tag = "COLD" if cold else "WARM"
line = f"[{ts}] {phase:20s} {tag:4s} {rtt_ms:8.1f} ms {note}"
elif timeout:
entry["timeout"] = True
line = f"[{ts}] {phase:20s} TIMEOUT"
else:
entry["error"] = True
line = f"[{ts}] {phase:20s} ERROR {note}"
_results.append(entry)
_log_lines.append(line)
def send_single_request(phase, timeout_s=8.0):
t0 = time.time()
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout_s)
s.connect((HOST, PORT))
s.sendall(HTTP_REQUEST)
resp = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
resp += chunk
rtt = (time.time() - t0) * 1000
cold = rtt > 700 # warm worker ~502ms, cold fallback ~800ms; midpoint=700
_record(phase, rtt_ms=rtt, cold=cold)
except socket.timeout:
_record(phase, timeout=True)
except ConnectionRefusedError:
_record(phase, error=True, note="Connection refused")
except Exception as e:
_record(phase, error=True, note=str(e))
def run_phase(phase_label, target_rps, duration_s):
"""
Fire requests at exactly target_rps based on wall-clock time,
independent of response latency.
The original per-second join design caused a feedback loop:
cold-start responses (0.8s) inflated each "second" to ~1.8s wall-clock,
halving the effective RPS delivered to the server and preventing CUSUM
from accumulating to its threshold.
"""
print(f"\n [{phase_label}] {target_rps} RPS x {duration_s}s", flush=True)
total = target_rps * duration_s
interval = 1.0 / target_rps
threads = []
for i in range(total):
t = threading.Thread(target=send_single_request,
args=(phase_label,), daemon=True)
t.start()
threads.append(t)
time.sleep(interval) # wall-clock pacing β does NOT wait for responses
print(f" {total} requests sent. Waiting for responses...", flush=True)
for t in threads:
t.join(timeout=15.0) # 15s > cold fallback(0.8s) + dispatch queue wait; was 12s (too short for 4-cycle)
with _lock:
done = sum(1 for r in _results if r.get("phase") == phase_label)
print(f" done={done}/{total}", flush=True)
def drain_phase(duration_s):
"""Zero-traffic drain: let all workers idle > idle_timeout so scavenger kills them."""
print(f"\n [Drain] 0 RPS x {duration_s}s (waiting for scavenger to reap all workers)",
flush=True)
for i in range(duration_s):
time.sleep(1)
print(f" draining... {i+1}/{duration_s}s", flush=True)
def percentile(data, p):
if not data:
return float('nan')
s = sorted(data)
idx = (len(s) - 1) * p / 100
lo, hi = int(idx), min(int(idx) + 1, len(s) - 1)
return s[lo] + (s[hi] - s[lo]) * (idx - lo)
def phase_stats(phase_label):
with _lock:
entries = [r for r in _results if r.get("phase") == phase_label]
latencies = [r["rtt_ms"] for r in entries if "rtt_ms" in r]
cold = sum(1 for r in entries if r.get("cold"))
warm = len(latencies) - cold
timeouts = sum(1 for r in entries if r.get("timeout"))
errors = sum(1 for r in entries if r.get("error"))
total = len(entries)
return latencies, cold, warm, timeouts, errors, total
def print_phase(label, latencies, cold, warm, timeouts, errors, total):
print(f"\n ββ {label} ββββββββββββββββββββββββββββββββββββββ")
print(f" Requests : {total} (warm={warm}, cold={cold}, timeout={timeouts}, err={errors})")
if latencies:
print(f" P50={percentile(latencies,50):.1f}ms "
f"P95={percentile(latencies,95):.1f}ms "
f"P99={percentile(latencies,99):.1f}ms "
f"max={max(latencies):.1f}ms")
# ββ Main βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Edge FaaS Bursty-Ramp load tester")
parser.add_argument("--warmup-c234", type=int, default=35,
help="Warmup duration (seconds) for cycles C2..C4. "
"Default 35s. Used by the warmup-sweep experiment "
"to vary the inter-burst interval at 5, 10, 20, 35, 60, 120 s.")
parser.add_argument("--warmup-c1", type=int, default=8,
help="Warmup duration (seconds) for C1 (fresh-start cycle). Default 8s.")
parser.add_argument("--spike-rps", type=int, default=30,
help="Peak RPS during the spike phase (default 30). "
"Used by the scale-invariance ablation (B5): re-run at "
"100 or 300 RPS to verify adaptive CUSUM works without "
"retuning drift/threshold across RPS magnitudes.")
parser.add_argument("--ramp-rps", type=int, default=None,
help="Peak RPS during the ramp phase. Defaults to ceil(2/3 * spike-rps), "
"preserving the ramp-as-pre-spike-signal proportion. Set explicitly "
"if you need a different ramp:spike ratio.")
parser.add_argument("--no-ramp", action="store_true",
help="Skip the Ramp phase entirely (step workload, C3). Cycle becomes "
"Warmup -> Spike -> Cooldown -> Drain. Used to demonstrate the "
"boundary case where CUSUM has no pre-spike signal to detect, "
"expected to remove the predictor's advantage over Reactive.")
args = parser.parse_args()
WARMUP_C1 = args.warmup_c1
WARMUP_C234 = args.warmup_c234
SPIKE_RPS = args.spike_rps
# Default ramp = 2/3 of spike (current default workload: ramp=20, spike=30).
# Floor at 1 RPS so the ramp phase always sends at least one request.
RAMP_RPS = args.ramp_rps if args.ramp_rps is not None else max(1, (2 * SPIKE_RPS) // 3)
NO_RAMP = args.no_ramp
os.makedirs("logs", exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = f"logs/test_{ts}.log"
# Workload variant tag for the banner β makes the log file
# immediately self-describing across B5 / C3 ablations.
if NO_RAMP:
variant = "Step (no ramp)"
elif SPIKE_RPS != 30:
variant = f"Bursty-Ramp scale={SPIKE_RPS}/{RAMP_RPS} RPS"
else:
variant = "Bursty-Ramp"
print("=" * 64)
print(f" Edge FaaS Gateway β {variant} Four-Cycle Test")
print(f" Target: {HOST}:{PORT}")
print(f" Log : {log_path}")
print(f" Warmup: C1={WARMUP_C1}s, C2-C4={WARMUP_C234}s")
print(f" Spike : {SPIKE_RPS} RPS x 5s Ramp: "
+ ("(disabled)" if NO_RAMP else f"{RAMP_RPS} RPS x 4s"))
print("=" * 64)
if NO_RAMP:
print("\n Scenario B: Step (no-ramp ablation, 4 cycles)")
print(f" C1: warmup(2,{WARMUP_C1}s) -> spike({SPIKE_RPS},5s) -> cooldown(2,5s) -> drain(8s)")
print(f" C2-C4: warmup(2,{WARMUP_C234}s) -> spike({SPIKE_RPS},5s) -> cooldown(2,5s) -> drain(8s)")
print(" Rationale: predictor has no ramp signal, so CUSUM cannot fire ahead of spike.")
print(" Tests the boundary where Reactive should match or beat CUSUM.")
else:
print("\n Scenario A: Bursty-Ramp (4 cycles)")
print(f" C1: warmup(2,{WARMUP_C1}s) -> ramp({RAMP_RPS},4s) -> spike({SPIKE_RPS},5s) -> cooldown(2,5s) -> drain(8s)")
print(f" C2-C4: warmup(2,{WARMUP_C234}s) -> ramp({RAMP_RPS},4s) -> spike({SPIKE_RPS},5s) -> cooldown(2,5s) -> drain(8s)")
print(" Rationale: ARIMA needs 2-3 cycles of history to learn ramp->spike pattern.")
print(" EWMA+CUSUM/Reactive/Static unaffected by extra cycles.")
print()
print(" cold = RTT > 700ms (~800ms cold fallback); warm = RTT <= 700ms (~502ms)")
time.sleep(1)
NUM_CYCLES = 4
PHASES = []
for i in range(1, NUM_CYCLES + 1):
c = f"C{i}"
if NO_RAMP:
PHASES += [f"{c}-Warmup", f"{c}-Spike", f"{c}-Cooldown"]
else:
PHASES += [f"{c}-Warmup", f"{c}-Ramp", f"{c}-Spike", f"{c}-Cooldown"]
def run_cycle(cycle_num: int):
c = f"C{cycle_num}"
warmup_s = WARMUP_C1 if cycle_num == 1 else WARMUP_C234
print(f"\n{'-'*64}")
print(f" CYCLE {cycle_num} (warmup={warmup_s}s)")
print(f"{'-'*64}")
run_phase(f"{c}-Warmup", target_rps=2, duration_s=warmup_s)
if not NO_RAMP:
run_phase(f"{c}-Ramp", target_rps=RAMP_RPS, duration_s=4)
run_phase(f"{c}-Spike", target_rps=SPIKE_RPS, duration_s=5)
run_phase(f"{c}-Cooldown", target_rps=2, duration_s=5)
if cycle_num < NUM_CYCLES:
drain_phase(duration_s=8)
try:
for i in range(1, NUM_CYCLES + 1):
run_cycle(i)
except KeyboardInterrupt:
print("\n[Interrupted]")
# ββ Per-phase summary βββββββββββββββββββββββββββββββββββββββββββββββββββββ
print("\n" + "=" * 64)
print(" RESULTS")
print("=" * 64)
for label in PHASES:
lat, cold, warm, to, err, total = phase_stats(label)
print_phase(label, lat, cold, warm, to, err, total)
# ββ Spike comparison across all cycles (primary metric) βββββββββββββββββββ
print("\n" + "=" * 64)
print(" SPIKE COMPARISON ACROSS CYCLES (primary metric)")
print("=" * 64)
spike_labels = [f"C{i}-Spike" for i in range(1, NUM_CYCLES + 1)]
header = f" {'Metric':<22}" + "".join(f" {l:>12}" for l in spike_labels)
print(header)
print(f" {'-'*22}" + "".join(f" {'-'*12}" for _ in spike_labels))
all_stats = [phase_stats(l) for l in spike_labels]
rows = [
("Cold starts", [s[1] for s in all_stats]),
("Warm hits", [s[2] for s in all_stats]),
("Warm rate", [f"{s[2]/s[5]*100:.1f}%" if s[5] else "N/A" for s in all_stats]),
("P50 latency", [f"{percentile(s[0],50):.1f}ms" if s[0] else "N/A" for s in all_stats]),
("P99 latency", [f"{percentile(s[0],99):.1f}ms" if s[0] else "N/A" for s in all_stats]),
]
for name, vals in rows:
print(f" {name:<22}" + "".join(f" {str(v):>12}" for v in vals))
print()
# ββ Write full log file βββββββββββββββββββββββββββββββββββββββββββββββββββ
with open(log_path, "w") as f:
f.write(f"Edge FaaS {variant} Test ({NUM_CYCLES} cycles) - {ts}\n")
if NO_RAMP:
f.write(f"Scenario: C1:warmup(2,{WARMUP_C1}s)->spike({SPIKE_RPS},5s)->cooldown->drain\n")
f.write(f" C2-C4: warmup(2,{WARMUP_C234}s)->spike({SPIKE_RPS},5s)->cooldown->drain\n")
else:
f.write(f"Scenario: C1:warmup(2,{WARMUP_C1}s)->ramp({RAMP_RPS},4s)->spike({SPIKE_RPS},5s)->cooldown->drain\n")
f.write(f" C2-C4: warmup(2,{WARMUP_C234}s)->ramp->spike->cooldown->drain\n")
f.write(f"Target: {HOST}:{PORT}\n\n")
for line in _log_lines:
f.write(line + "\n")
f.write("\n--- Summary ---\n")
for label in PHASES:
lat, cold, warm, to, err, total = phase_stats(label)
f.write(f"\n{label}: total={total} warm={warm} cold={cold} "
f"timeout={to} err={err} [threshold=700ms]\n")
if lat:
f.write(f" P50={percentile(lat,50):.1f}ms P95={percentile(lat,95):.1f}ms "
f"P99={percentile(lat,99):.1f}ms max={max(lat):.1f}ms\n")
print(f" Full log saved -> {log_path}")