Skip to content

Commit af22175

Browse files
authored
Add script for testing the server under parallel load (#20)
1 parent 3fe7a1c commit af22175

1 file changed

Lines changed: 217 additions & 0 deletions

File tree

misc/load-test.py

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
SeqRepo REST Service load testing. Used for validating concurrency / file descriptor bug fixes.
5+
6+
Also useful for testing response performance of the seqrepo-rest-service program. Prints the
7+
average request completion rate (requests/second) at the end.
8+
9+
Uses multiprocessing.Process to create parallel workers (count=`-n`) sending requests to a
10+
SeqRepo REST Service endpoint (`-u`). Sends `-m` requests with different values. Uses `-s` local
11+
seqrepo directory to get viable input values from and monitors the number of open files using lsof.
12+
13+
If running the seqrepo rest service through Docker, the lsof monitoring will only work if the seqrepo
14+
directory the REST service uses is mounted as a local volume in the `docker run`. It cannot be on
15+
a persistent docker volume or copied in at runtime because lsof will not see the open files.
16+
17+
Example docker run for server, where a local seqrepo directory exists at /usr/local/share/seqrepo/latest:
18+
```
19+
docker run -it --rm \
20+
-p 5000:5000 \
21+
-v /usr/local/share/seqrepo/latest:/seqrepo/latest \
22+
biocommons/seqrepo-rest-service:0.2.2 \
23+
seqrepo-rest-service /seqrepo/latest
24+
```
25+
26+
Example command (20 worker processes, 500 requests, monitoring /usr/local/share/seqrepo/latest):
27+
```
28+
python load-test.py -n 20 -s /usr/local/share/seqrepo/latest -m 500 -u 'http://localhost:5000/seqrepo'
29+
```
30+
31+
A successful run will exit successfully, with no exceptions in the load-test.py process or in
32+
the seqrepo rest service process. And the open file count logged by load-test.py will not increase
33+
continuously but rather stabilize at a relatively low level on the order of tens of files.
34+
"""
35+
36+
37+
import argparse
38+
import pathlib
39+
import random
40+
import subprocess
41+
import logging
42+
import time
43+
import sys
44+
import queue
45+
import multiprocessing # as multiprocessing
46+
from typing import TextIO
47+
48+
from biocommons.seqrepo import SeqRepo
49+
from biocommons.seqrepo.dataproxy import SeqRepoRESTDataProxy
50+
51+
_logger = logging.getLogger()
52+
53+
54+
def log(log_queue: multiprocessing.Queue, line: str):
55+
log_queue.put(line + "\n")
56+
57+
58+
def lsof_count(dirname: str) -> int:
59+
lsof_cmd = [
60+
"bash", "-c",
61+
f"lsof +D {dirname} | wc -l"]
62+
lsof_p = subprocess.Popen(
63+
lsof_cmd,
64+
stdout=subprocess.PIPE)
65+
(stdout, _) = lsof_p.communicate()
66+
stdout = stdout.decode("utf-8").strip()
67+
return int(stdout)
68+
69+
70+
class LsofWorker(multiprocessing.Process):
71+
def __init__(self, dirname, check_interval=5):
72+
"""
73+
check_interval: seconds between open file checks
74+
"""
75+
self.dirname = dirname
76+
self.check_interval = check_interval
77+
super().__init__()
78+
79+
def run(self):
80+
try:
81+
while True:
82+
ct = lsof_count(self.dirname)
83+
print(f"{self.dirname} open file count {ct}", flush=True)
84+
time.sleep(self.check_interval)
85+
except InterruptedError:
86+
pass
87+
88+
89+
class MPWorker(multiprocessing.Process):
90+
close_sentinel_value = -1
91+
92+
def __init__(self, q: multiprocessing.Queue, seqrepo_uri: str):
93+
self.q = q
94+
self.seqrepo_uri = seqrepo_uri
95+
self.seqrepo_dataproxy = SeqRepoRESTDataProxy(seqrepo_uri)
96+
self.n = 0
97+
self.query_bound_start = 0
98+
self.query_bound_end = 5
99+
super().__init__()
100+
101+
def run(self):
102+
while True:
103+
try:
104+
ac = self.q.get(False)
105+
if ac == MPWorker.close_sentinel_value:
106+
print(f"{self}: Done; processed {self.n} accessions", flush=True)
107+
break
108+
self.seqrepo_dataproxy.get_sequence(
109+
ac, self.query_bound_start, self.query_bound_end)
110+
self.n += 1
111+
except queue.Empty:
112+
pass
113+
114+
115+
def queue_filler_target(q, acs, n_workers):
116+
"""
117+
Callable target for queue filler. Necessary because multiprocess.Queue
118+
uses pipes with a buffer limit that is relatively low. Background process
119+
ensures queue keeps getting rest of input values, plus close sentinels.
120+
"""
121+
for ac in acs:
122+
q.put(ac)
123+
for _ in range(n_workers):
124+
q.put(MPWorker.close_sentinel_value)
125+
print("Done filling input queue", flush=True)
126+
127+
128+
class StdOutPipeWorker(multiprocessing.Process):
129+
"""
130+
Used for synchronized logging between main and sub processes
131+
"""
132+
133+
def __init__(self, stdout_queue: multiprocessing.Queue, ostream: TextIO = None):
134+
self.stdout_queue = stdout_queue
135+
self.ostream = ostream if ostream else sys.stdout
136+
self.stopped = False
137+
super().__init__()
138+
139+
def run(self):
140+
while not self.stopped:
141+
try:
142+
val = self.stdout_queue.get(timeout=0.5)
143+
print(val, file=self.ostream, end="")
144+
except queue.Empty:
145+
pass
146+
147+
def stop(self):
148+
self.stopped = True
149+
150+
151+
def parse_args(argv):
152+
ap = argparse.ArgumentParser(description=__doc__)
153+
ap.add_argument("-n", "--num-workers", type=int, default=1)
154+
ap.add_argument("-s", "--seqrepo-path", type=pathlib.Path, required=True,
155+
help="Local SeqRepo instance to get input values from, and to monitor open file count in")
156+
ap.add_argument("-u", "--seqrepo-rest-uri", type=str, default="http://localhost:5000/seqrepo")
157+
ap.add_argument("-m", "--max-accessions", type=int, required=True)
158+
ap.add_argument("-f", "--fd-cache-size", type=int, default=0)
159+
opts = ap.parse_args(argv)
160+
return opts
161+
162+
163+
def main(argv):
164+
opts = parse_args(argv)
165+
166+
sr = SeqRepo(root_dir=opts.seqrepo_path, fd_cache_size=opts.fd_cache_size)
167+
168+
acs = set(a["alias"] for a in sr.aliases.find_aliases(namespace="RefSeq", alias="NM_%"))
169+
acs = random.sample(sorted(acs), opts.max_accessions or len(acs))
170+
171+
input_queue = multiprocessing.Queue()
172+
173+
# log_queue = multiprocessing.Queue(maxsize=10000)
174+
# log_worker = StdOutPipeWorker(log_queue, sys.stdout)
175+
# log_worker.start()
176+
177+
t_filler = multiprocessing.Process(
178+
target=queue_filler_target, args=(input_queue, acs, opts.num_workers))
179+
t_filler.start()
180+
181+
workers = []
182+
for _ in range(opts.num_workers):
183+
workers.append(MPWorker(input_queue, opts.seqrepo_rest_uri))
184+
185+
lsof_p = None
186+
print("Starting lsof process")
187+
lsof_p = LsofWorker(opts.seqrepo_path, 1)
188+
lsof_p.start()
189+
190+
# Sleep briefly to let input queue get ahead
191+
time.sleep(1)
192+
print("Finished initialization")
193+
194+
time_start = time.time()
195+
print("Starting workers")
196+
for w in workers:
197+
w.start()
198+
199+
for w in workers:
200+
w.join()
201+
202+
time_end = time.time()
203+
time_diff = time_end - time_start
204+
205+
if lsof_p:
206+
lsof_p.terminate()
207+
208+
print(f"Retrieved {len(acs)} seq in {time_diff} seconds ({len(acs)/time_diff} seq/sec)")
209+
210+
# log_worker.stop()
211+
# log_worker.join()
212+
213+
214+
if __name__ == "__main__":
215+
import coloredlogs
216+
coloredlogs.install(level="INFO")
217+
main(argv=sys.argv[1:])

0 commit comments

Comments
 (0)