Skip to content

Commit fdece64

Browse files
committed
bench: query latency under concurrent ingest reproduction script
1 parent cd75d02 commit fdece64

1 file changed

Lines changed: 68 additions & 0 deletions

File tree

bench/query_under_ingest.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#!/usr/bin/env python3
2+
"""Reproduce the prod stall: time the monoscope span-list query while the same
3+
project receives continuous ingest. Usage:
4+
python3 bench/query_under_ingest.py [duration_s] [rows_per_batch] [batches_per_sec]
5+
"""
6+
import os, sys, threading, time
7+
from datetime import datetime, timezone
8+
from pathlib import Path
9+
10+
import psycopg
11+
12+
ROOT = Path(__file__).resolve().parent
13+
sys.path.insert(0, str(ROOT))
14+
from concurrent_load import COLUMNS, _to_pg, load_rows, shift_for_project
15+
16+
URL = f"host=127.0.0.1 port={os.environ.get('PGWIRE_PORT','12345')} user=postgres password=postgres dbname=postgres"
17+
PID = "qlat-scale-p0"
18+
Q = """SELECT jsonb_build_array(id, to_char(timestamp at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), context___trace_id, name, duration, resource___service___name, parent_id, CAST(EXTRACT(EPOCH FROM (start_time)) * 1000000000 AS BIGINT), errors is not null, to_jsonb(summary), context___span_id, kind)
19+
FROM otel_logs_and_spans
20+
WHERE project_id = %s AND timestamp BETWEEN now() - interval '4 days' AND now() AND (TRUE)
21+
ORDER BY timestamp DESC LIMIT 501"""
22+
23+
stop = threading.Event()
24+
inserted = [0]
25+
26+
27+
def writer(rows_per_batch: int, batches_per_sec: float):
28+
rows = shift_for_project(load_rows(rows_per_batch), PID, __import__("datetime").timedelta(0))
29+
placeholders = "(" + ", ".join(["%s"] * len(COLUMNS)) + ")"
30+
sql = f"INSERT INTO otel_logs_and_spans ({', '.join(COLUMNS)}) VALUES " + ", ".join([placeholders] * len(rows))
31+
with psycopg.connect(URL, autocommit=True) as c, c.cursor() as cur:
32+
while not stop.is_set():
33+
t0 = time.perf_counter()
34+
now = datetime.now(timezone.utc)
35+
for r in rows:
36+
r["timestamp"] = now.isoformat()
37+
r["date"] = now.date().isoformat()
38+
cur.execute(sql, [_to_pg(col, r.get(col)) for r in rows for col in COLUMNS])
39+
inserted[0] += len(rows)
40+
stop.wait(max(0.0, 1.0 / batches_per_sec - (time.perf_counter() - t0)))
41+
42+
43+
def main():
44+
duration = int(sys.argv[1]) if len(sys.argv) > 1 else 60
45+
rows_per_batch = int(sys.argv[2]) if len(sys.argv) > 2 else 100
46+
batches_per_sec = float(sys.argv[3]) if len(sys.argv) > 3 else 2.0
47+
t = threading.Thread(target=writer, args=(rows_per_batch, batches_per_sec), daemon=True)
48+
t.start()
49+
lats = []
50+
t_end = time.time() + duration
51+
with psycopg.connect(URL, autocommit=True) as c, c.cursor() as cur:
52+
while time.time() < t_end:
53+
t0 = time.perf_counter()
54+
cur.execute(Q, (PID,))
55+
cur.fetchall()
56+
ms = (time.perf_counter() - t0) * 1000
57+
lats.append(ms)
58+
print(f"query: {ms:7.1f}ms (ingested so far: {inserted[0]})", flush=True)
59+
time.sleep(1)
60+
stop.set()
61+
t.join(timeout=5)
62+
lats.sort()
63+
n = len(lats)
64+
print(f"\nn={n} p50={lats[n // 2]:.0f}ms p95={lats[int(n * 0.95)]:.0f}ms max={lats[-1]:.0f}ms rate={inserted[0] / duration:.0f} rows/s")
65+
66+
67+
if __name__ == "__main__":
68+
main()

0 commit comments

Comments
 (0)