Skip to content

Commit 7776685

Browse files
committed
[card-cache] optimizations to avoid deadlocks
- deadlocks happened during cleanups (it was inefficient and load tests simulating cleanup + heavy load situations showed that the deadlock is real) - Locking at the `context` level. Each time we wanna clean up, we create new context and all directories/processes are within that context. - Each time we want to cleanup the cache, we switch the `context` where processes are gettting written/spawned. Context also sets the read/write directory of the caches. - So that during cleanup it only locks to change the `context` and then it can take it's sweet time to cleanup the dictionary in memory and the directory. - Also making default running of service for each card to 60 seconds (helps make things snappier) - optimizations to avoid **any** concurrent access of shared resource - Added `timings` dict in card cache to optimize loading cycles (Ensured that it is set based on a per-card basis) - Changed defaults for minimum amount to time to wait for cards in the cache process.
1 parent e0b1499 commit 7776685

3 files changed

Lines changed: 200 additions & 114 deletions

File tree

services/ui_backend_service/data/cache/card_cache_manager.py

Lines changed: 166 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import uuid
77
from asyncio.subprocess import Process
88
import asyncio
9-
from .card_cache_service import CardCache, cleanup_non_running_caches
9+
from .card_cache_service import CardCache, safe_wipe_dir
1010
import contextlib
1111

1212

@@ -26,11 +26,11 @@
2626
"CARD_CACHE_PROCESS_MAX_UPTIME", 3 * 60 # 3 minutes
2727
)
2828
CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME = os.environ.get(
29-
"CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME", 4 # 4 seconds
29+
"CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME", 20 # 20 seconds
3030
)
31-
DEFAULT_CACHE_STORAGE_PATH = "/tmp"
32-
CACHE_STORAGE_PATH = os.environ.get(
33-
"CARD_CACHE_STORAGE_PATH", DEFAULT_CACHE_STORAGE_PATH
31+
DEFAULT_CACHE_STORAGE_PATH_ROOT = "/tmp"
32+
CACHE_STORAGE_PATH_ROOT = os.environ.get(
33+
"CARD_CACHE_STORAGE_PATH_ROOT", DEFAULT_CACHE_STORAGE_PATH_ROOT
3434
)
3535
CACHE_SERVICE_LOG_STORAGE_ROOT = os.environ.get("CACHE_SERVICE_LOG_STORAGE_ROOT", None)
3636

@@ -57,35 +57,106 @@ async def process_is_running(process: Process):
5757
return status == "running"
5858

5959

60-
class AsyncProcessManager:
60+
class AsyncCardCacheProcessManager:
6161

6262
processes = {
63-
# "procid": {
64-
# "proc": asyncio.subprocess.Process,
65-
# "started": time.time()
63+
# "<current-context>":{
64+
# "processes": {
65+
# "<procid>": {
66+
# "proc": asyncio.subprocess.Process,
67+
# "started": time.time()
68+
# }
69+
# },
70+
# "write_directory": "<write_directory>"
6671
# }
6772
}
6873

74+
def get_context_dict(self, context):
75+
_x = self.processes.get(context, None)
76+
if _x is None:
77+
return _x
78+
return _x.copy()
79+
80+
@property
81+
def current_process_dict(self):
82+
return self.processes[self._current_context]["processes"]
83+
84+
@property
85+
def current_write_directory(self):
86+
return self.processes[self._current_context]["write_directory"]
87+
6988
def __init__(self, logger) -> None:
70-
self.lock = asyncio.Lock()
89+
self.context_lock = asyncio.Lock()
7190
self.logger = logger
91+
self._current_context = None
92+
self.update_context()
93+
94+
async def set_new_context(self):
95+
async with self.context_lock:
96+
old_context = self._current_context
97+
self.update_context()
98+
return old_context
99+
100+
async def remove_context(self, context):
101+
async with self.context_lock:
102+
if context in self.processes:
103+
del self.processes[context]
104+
105+
def update_context(self):
106+
_ctx_dict, _ctx = self.create_context_dict()
107+
self.processes.update(_ctx_dict)
108+
self._current_context = _ctx
109+
110+
def create_context_dict(self):
111+
_ctx = uuid.uuid4().hex[:8]
112+
return {
113+
_ctx: {
114+
"processes": {},
115+
"write_directory": os.path.join(CACHE_STORAGE_PATH_ROOT, _ctx),
116+
}
117+
}, _ctx
72118

73-
def _register_process(self, procid, proc):
74-
self.processes[procid] = {
119+
def _register_process(self, procid, proc, stdout, stderr):
120+
self.current_process_dict[procid] = {
75121
"proc": proc,
76122
"started": time.time(),
123+
"stdout": stdout,
124+
"stderr": stderr
77125
}
78126

79-
async def add(self, procid, cmd, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT):
127+
def _make_command(self, pathspec):
128+
return [
129+
str(i)
130+
for i in [
131+
sys.executable,
132+
PATH_TO_CACHE_SERVICE,
133+
"task-updates",
134+
pathspec,
135+
"--uptime-seconds",
136+
CARD_CACHE_PROCESS_MAX_UPTIME,
137+
"--list-frequency",
138+
CARD_LIST_POLLING_FREQUENCY,
139+
"--data-update-frequency",
140+
DATA_UPDATE_POLLING_FREQUENCY,
141+
"--html-update-frequency",
142+
CARD_UPDATE_POLLING_FREQUENCY,
143+
"--cache-path",
144+
self.current_write_directory,
145+
"--max-no-card-wait-time",
146+
CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME,
147+
]
148+
]
149+
150+
async def add(self, procid, pathspec, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT):
80151
running_proc = await self.is_running(procid)
81152
if running_proc:
82153
return procid, "running"
83154
# The lock helps to ensure that the processes only get added one at a time
84155
# This is important because the processes are added to a shared dictionary
85-
async with self.lock:
156+
async with self.context_lock:
86157
proc, started_on = self.get(procid)
87158
if proc is not None:
88-
await self.remove(procid, delete_item=True)
159+
await self.remove_current(procid, delete_item=True)
89160

90161
logs_file = None
91162
if logs_file_path is not None:
@@ -96,12 +167,12 @@ async def add(self, procid, cmd, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT):
96167
),
97168
"w",
98169
)
99-
170+
cmd = self._make_command(pathspec)
100171
await self.spawn(procid, cmd, logs_file, logs_file)
101172
return procid, "started"
102173

103174
def get(self, procid):
104-
proc_dict = self.processes.get(procid, None)
175+
proc_dict = self.current_process_dict.get(procid, None)
105176
if proc_dict is not None:
106177
return proc_dict["proc"], proc_dict["started"]
107178
return None, None
@@ -113,81 +184,95 @@ async def spawn(self, procid, cmd, stdout_file, std_err_file=None):
113184
stderr=std_err_file,
114185
shell=False,
115186
)
116-
self._register_process(procid, proc)
187+
self._register_process(procid, proc, stdout_file, std_err_file)
188+
189+
async def clean_terminate_process(self, process: Process):
190+
try:
191+
self.logger.error("Terminating process %s " % process.pid)
192+
if process.returncode is None:
193+
process.terminate()
194+
await process.wait()
195+
except Exception as e:
196+
self.logger.error(f"Error terminating process: {str(type(e))}")
197+
198+
async def clean_close_process(self, process):
199+
try:
200+
await process.wait()
201+
except Exception as e:
202+
self.logger.error(f"Error closing process: {str(e)}")
203+
204+
def _close_stdout_and_err(self, process_context_dict):
205+
if process_context_dict["stdout"] is not None:
206+
process_context_dict["stdout"].close()
207+
if process_context_dict["stderr"] is not None:
208+
process_context_dict["stderr"].close()
117209

118-
async def remove(self, procid, delete_item=True):
119-
if procid not in self.processes:
210+
async def remove_current(self, procid, delete_item=True):
211+
if procid not in self.current_process_dict:
120212
return
121213
self.logger.info("Removing process: %s" % procid)
122-
await self.processes[procid]["proc"].wait()
123-
self.logger.info("Process removed: %s" % procid)
124-
if self.processes[procid]["proc"].stdout is not None:
125-
self.processes[procid]["proc"].stdout.close()
214+
await self.clean_close_process(self.current_process_dict[procid]["proc"])
215+
self._close_stdout_and_err(self.current_process_dict[procid])
126216
if delete_item:
127-
del self.processes[procid]
217+
del self.current_process_dict[procid]
128218

129219
async def cleanup(self):
130-
# The lock ensures that when the dictionary is being modified,
131-
# no other process can modify it at the same time.
132-
async with self.lock:
133-
removal_keys = []
134-
for procid in self.processes:
135-
running_proc = await self.is_running(procid)
136-
if running_proc:
137-
continue
138-
removal_keys.append(procid)
139-
await self.remove(procid, delete_item=False)
140-
for procid in removal_keys:
141-
del self.processes[procid]
142-
return removal_keys
220+
old_context = await self.set_new_context()
221+
_ctx_dict = self.get_context_dict(old_context)
222+
if _ctx_dict is None:
223+
return []
224+
# Two things to remove (old Keys and old directories)
225+
wait_keys = []
226+
for pid in _ctx_dict["processes"]:
227+
status = await process_status(_ctx_dict["processes"][pid]["proc"])
228+
if status in ["completed", "failed"]:
229+
await self.clean_close_process(_ctx_dict["processes"][pid]["proc"])
230+
self._close_stdout_and_err(_ctx_dict["processes"][pid]["proc"])
231+
else:
232+
wait_keys.append(pid)
233+
if len(wait_keys) > 0:
234+
# sleeping for MAX_PROCESS_TIME so that all the processes writing should have finished
235+
# AND any upstream request accessing the cache-dir should have been attenteded
236+
await asyncio.sleep(CARD_CACHE_PROCESS_MAX_UPTIME)
237+
for pid in wait_keys:
238+
await self.clean_terminate_process(_ctx_dict["processes"][pid]["proc"])
239+
self._close_stdout_and_err(_ctx_dict["processes"][pid]["proc"])
240+
241+
_write_dir = _ctx_dict["write_directory"]
242+
self.logger.info("Removing directory: %s" % _write_dir)
243+
safe_wipe_dir(_write_dir)
244+
await self.remove_context(old_context)
143245

144246
async def is_running(self, procid):
145-
if procid not in self.processes:
247+
if procid not in self.current_process_dict:
146248
return False
147-
return await process_is_running(self.processes[procid]["proc"])
249+
return await process_is_running(self.current_process_dict[procid]["proc"])
148250

149251
async def get_status(self, procid):
150-
if procid not in self.processes:
252+
if procid not in self.current_process_dict:
151253
return None
152-
return await process_status(self.processes[procid]["proc"])
254+
return await process_status(self.current_process_dict[procid]["proc"])
153255

154256
async def running_processes(self):
155-
return [procid for procid in self.processes if await self.is_running(procid)]
257+
return [
258+
procid
259+
for procid in self.current_process_dict
260+
if await self.is_running(procid)
261+
]
156262

157263

158264
class CardCacheManager:
159265
def __init__(self) -> None:
160266
self.logger = logging.getLogger("CardCacheManager")
161-
self._process_manager = AsyncProcessManager(self.logger)
267+
self._process_manager = AsyncCardCacheProcessManager(self.logger)
162268
self._manager_id = uuid.uuid4().hex
163269
self.logger.info("CardCacheManager initialized")
164270

165-
def _make_task_command(self, pathspec):
166-
return [
167-
str(i)
168-
for i in [
169-
sys.executable,
170-
PATH_TO_CACHE_SERVICE,
171-
"task-updates",
172-
pathspec,
173-
"--uptime-seconds",
174-
CARD_CACHE_PROCESS_MAX_UPTIME,
175-
"--list-frequency",
176-
CARD_LIST_POLLING_FREQUENCY,
177-
"--data-update-frequency",
178-
DATA_UPDATE_POLLING_FREQUENCY,
179-
"--html-update-frequency",
180-
CARD_UPDATE_POLLING_FREQUENCY,
181-
"--cache-path",
182-
CACHE_STORAGE_PATH,
183-
"--max-no-card-wait-time",
184-
CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME
185-
]
186-
]
187-
188271
def get_local_cache(self, pathspec, card_hash):
189272
cache = CardCache.load_from_disk(
190-
pathspec, card_hash, CACHE_STORAGE_PATH,
273+
pathspec,
274+
card_hash,
275+
self._process_manager.current_write_directory,
191276
)
192277
return cache
193278

@@ -196,40 +281,22 @@ async def register(self, pathspec):
196281
is_running = await self._process_manager.is_running(proc_id)
197282
if is_running:
198283
return proc_id, "running"
199-
200-
cmd = self._make_task_command(pathspec)
201-
self.logger.info(
202-
"Registering task [%s]" % (pathspec)
284+
self.logger.info("Registering task [%s]" % (pathspec))
285+
_id, status = await self._process_manager.add(
286+
proc_id, pathspec, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT
203287
)
204-
_id, status = await self._process_manager.add(proc_id, cmd, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT)
205288
return _id, status
206289

207290
async def get_status(self, pathspec):
208291
return await self._process_manager.get_status(pathspec)
209292

210-
async def start_process_cleanup_routine(self, interval=60):
211-
try:
212-
while True:
213-
cleanup_keys = await self._process_manager.cleanup() # Perform the cleanup
214-
if len(cleanup_keys) > 0:
215-
self.logger.info(
216-
"Cleaned up processes: %s" % ", ".join(cleanup_keys)
217-
)
218-
await asyncio.sleep(interval) # Wait for a specified interval before the next cleanup
219-
except asyncio.CancelledError:
220-
self.logger.info("Process cleanup routine cancelled")
221-
222-
async def cleanup_disk_routine(self, interval=60 * 60 * 4):
293+
async def regular_cleanup_routine(self, interval=60 * 60 * 24 * 5):
223294
try:
224295
while True:
225296
await asyncio.sleep(interval)
226-
# The lock ensure that new processes are not getting created or
227-
# processes are not getting removed when the disk cleanup is happening.
228-
async with self._process_manager.lock:
229-
running_proc_ids = await self._process_manager.running_processes()
230-
cleanup_non_running_caches(CACHE_STORAGE_PATH, CardCache.CACHE_DIR, running_proc_ids)
297+
await self._process_manager.cleanup()
231298
except asyncio.CancelledError:
232-
self.logger.info("Disk cleanup routine cancelled")
299+
self.logger.info("Process/Directory cleanup routine cancelled")
233300

234301

235302
async def verify_process_has_crashed(cache_manager: CardCacheManager, pathspec):
@@ -254,7 +321,10 @@ def _get_html_or_refresh(local_cache: CardCache):
254321

255322

256323
async def wait_until_card_is_ready(
257-
cache_manager: CardCacheManager, local_cache: CardCache, max_wait_time=3, frequency=0.1
324+
cache_manager: CardCacheManager,
325+
local_cache: CardCache,
326+
max_wait_time=3,
327+
frequency=0.1,
258328
):
259329
html = None
260330
start_time = time.time()
@@ -264,7 +334,9 @@ async def wait_until_card_is_ready(
264334
html = _get_html_or_refresh(local_cache)
265335
if html is not None:
266336
break
267-
process_failed = await verify_process_has_crashed(cache_manager, local_cache.pathspec)
337+
process_failed = await verify_process_has_crashed(
338+
cache_manager, local_cache.pathspec
339+
)
268340
if process_failed:
269341
cache_manager.logger.error(
270342
f"Card {local_cache.card_hash} has crashed for {local_cache.pathspec}"

0 commit comments

Comments
 (0)