Skip to content

Commit fac8506

Browse files
optimize:Faster init and Atomic mechanism adapts to high-concurrency scenarios
1 parent 4a7e11e commit fac8506

File tree

3 files changed

+196
-97
lines changed

3 files changed

+196
-97
lines changed

src/multiprocess/multiprocess_memory_limit.c

Lines changed: 169 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ int pidfound;
3838

3939
int ctx_activate[32];
4040

41-
static shared_region_info_t region_info = {0, -1, PTHREAD_ONCE_INIT, NULL, 0};
41+
static shared_region_info_t region_info = {0, -1, PTHREAD_ONCE_INIT, NULL, 0, NULL};
4242
//size_t initial_offset=117440512;
4343
int env_utilization_switch;
4444
int enable_active_oom_killer;
@@ -54,12 +54,23 @@ void do_init_device_memory_limits(uint64_t*, int);
5454
void exit_withlock(int exitcode);
5555

5656
void set_current_gpu_status(int status){
57+
// Fast path: use cached slot if available
58+
if (region_info.my_slot != NULL) {
59+
atomic_store_explicit(&region_info.my_slot->status, status, memory_order_release);
60+
return;
61+
}
62+
63+
// Slow path: search for our slot
64+
int proc_num = atomic_load_explicit(&region_info.shared_region->proc_num, memory_order_acquire);
5765
int i;
58-
for (i=0;i<region_info.shared_region->proc_num;i++)
59-
if (getpid()==region_info.shared_region->procs[i].pid){
60-
region_info.shared_region->procs[i].status = status;
66+
int32_t my_pid = getpid();
67+
for (i=0;i<proc_num;i++) {
68+
int32_t slot_pid = atomic_load_explicit(&region_info.shared_region->procs[i].pid, memory_order_acquire);
69+
if (my_pid == slot_pid){
70+
atomic_store_explicit(&region_info.shared_region->procs[i].status, status, memory_order_release);
6171
return;
6272
}
73+
}
6374
}
6475

6576
void sig_restore_stub(int signo){
@@ -240,18 +251,26 @@ size_t get_gpu_memory_monitor(const int dev) {
240251
return total;
241252
}
242253

254+
// Lock-free memory usage aggregation
243255
size_t get_gpu_memory_usage(const int dev) {
244-
LOG_INFO("get_gpu_memory_usage dev=%d",dev);
256+
LOG_INFO("get_gpu_memory_usage_lockfree dev=%d",dev);
245257
ensure_initialized();
246258
int i=0;
247259
size_t total=0;
248-
lock_shrreg();
249-
for (i=0;i<region_info.shared_region->proc_num;i++){
250-
LOG_INFO("dev=%d pid=%d host pid=%d i=%lu",dev,region_info.shared_region->procs[i].pid,region_info.shared_region->procs[i].hostpid,region_info.shared_region->procs[i].used[dev].total)
251-
total+=region_info.shared_region->procs[i].used[dev].total;
260+
261+
// Lock-free read with acquire semantics for proc_num
262+
int proc_num = atomic_load_explicit(&region_info.shared_region->proc_num, memory_order_acquire);
263+
264+
for (i=0;i<proc_num;i++){
265+
// Atomic loads with relaxed ordering (aggregation doesn't need strict ordering)
266+
int32_t pid = atomic_load_explicit(&region_info.shared_region->procs[i].pid, memory_order_relaxed);
267+
int32_t hostpid = atomic_load_explicit(&region_info.shared_region->procs[i].hostpid, memory_order_relaxed);
268+
uint64_t proc_usage = atomic_load_explicit(&region_info.shared_region->procs[i].used[dev].total, memory_order_relaxed);
269+
270+
LOG_INFO("dev=%d pid=%d host pid=%d i=%lu",dev,pid,hostpid,proc_usage);
271+
total+=proc_usage;
252272
}
253273
total+=initial_offset;
254-
unlock_shrreg();
255274
return total;
256275
}
257276

@@ -271,19 +290,22 @@ int set_gpu_device_memory_monitor(int32_t pid,int dev,size_t monitor){
271290
return 1;
272291
}
273292

274-
int set_gpu_device_sm_utilization(int32_t pid,int dev, unsigned int smUtil){ // new function
293+
// Lock-free SM utilization update
294+
int set_gpu_device_sm_utilization(int32_t pid,int dev, unsigned int smUtil){
275295
int i;
276296
ensure_initialized();
277-
lock_shrreg();
278-
for (i=0;i<region_info.shared_region->proc_num;i++){
279-
if (region_info.shared_region->procs[i].hostpid == pid){
280-
LOG_INFO("set_gpu_device_sm_utilization:%d %d %lu->%u", pid, dev, region_info.shared_region->procs[i].device_util[dev].sm_util, smUtil);
281-
region_info.shared_region->procs[i].device_util[dev].sm_util = smUtil;
282-
break;
297+
298+
int proc_num = atomic_load_explicit(&region_info.shared_region->proc_num, memory_order_acquire);
299+
for (i=0;i<proc_num;i++){
300+
int32_t hostpid = atomic_load_explicit(&region_info.shared_region->procs[i].hostpid, memory_order_acquire);
301+
if (hostpid == pid){
302+
uint64_t old_util = atomic_load_explicit(&region_info.shared_region->procs[i].device_util[dev].sm_util, memory_order_relaxed);
303+
LOG_INFO("set_gpu_device_sm_utilization_lockfree:%d %d %lu->%u", pid, dev, old_util, smUtil);
304+
atomic_store_explicit(&region_info.shared_region->procs[i].device_util[dev].sm_util, smUtil, memory_order_relaxed);
305+
return 1;
283306
}
284307
}
285-
unlock_shrreg();
286-
return 1;
308+
return 0;
287309
}
288310

289311
int init_gpu_device_utilization(){
@@ -333,62 +355,108 @@ uint64_t nvml_get_device_memory_usage(const int dev) {
333355
return usage;
334356
}
335357

358+
// Lock-free memory add using atomics
336359
int add_gpu_device_memory_usage(int32_t pid,int cudadev,size_t usage,int type){
337-
LOG_INFO("add_gpu_device_memory:%d %d->%d %lu",pid,cudadev,cuda_to_nvml_map(cudadev),usage);
360+
LOG_INFO("add_gpu_device_memory_lockfree:%d %d->%d %lu",pid,cudadev,cuda_to_nvml_map(cudadev),usage);
338361
int dev = cuda_to_nvml_map(cudadev);
339362
ensure_initialized();
340-
lock_shrreg();
363+
364+
// Fast path: use cached slot pointer for our own process
365+
if (pid == getpid() && region_info.my_slot != NULL) {
366+
atomic_fetch_add_explicit(&region_info.my_slot->used[dev].total, usage, memory_order_relaxed);
367+
switch (type) {
368+
case 0:
369+
atomic_fetch_add_explicit(&region_info.my_slot->used[dev].context_size, usage, memory_order_relaxed);
370+
break;
371+
case 1:
372+
atomic_fetch_add_explicit(&region_info.my_slot->used[dev].module_size, usage, memory_order_relaxed);
373+
break;
374+
case 2:
375+
atomic_fetch_add_explicit(&region_info.my_slot->used[dev].data_size, usage, memory_order_relaxed);
376+
break;
377+
}
378+
LOG_INFO("gpu_device_memory_added_lockfree:%d %d %lu",pid,dev,usage);
379+
return 0;
380+
}
381+
382+
// Slow path: find slot for other process (still lock-free)
383+
int proc_num = atomic_load_explicit(&region_info.shared_region->proc_num, memory_order_acquire);
341384
int i;
342-
for (i=0;i<region_info.shared_region->proc_num;i++){
343-
if (region_info.shared_region->procs[i].pid == pid){
344-
region_info.shared_region->procs[i].used[dev].total+=usage;
385+
for (i=0;i<proc_num;i++){
386+
int32_t slot_pid = atomic_load_explicit(&region_info.shared_region->procs[i].pid, memory_order_acquire);
387+
if (slot_pid == pid){
388+
atomic_fetch_add_explicit(&region_info.shared_region->procs[i].used[dev].total, usage, memory_order_relaxed);
345389
switch (type) {
346-
case 0:{
347-
region_info.shared_region->procs[i].used[dev].context_size += usage;
390+
case 0:
391+
atomic_fetch_add_explicit(&region_info.shared_region->procs[i].used[dev].context_size, usage, memory_order_relaxed);
348392
break;
349-
}
350-
case 1:{
351-
region_info.shared_region->procs[i].used[dev].module_size += usage;
393+
case 1:
394+
atomic_fetch_add_explicit(&region_info.shared_region->procs[i].used[dev].module_size, usage, memory_order_relaxed);
395+
break;
396+
case 2:
397+
atomic_fetch_add_explicit(&region_info.shared_region->procs[i].used[dev].data_size, usage, memory_order_relaxed);
352398
break;
353-
}
354-
case 2:{
355-
region_info.shared_region->procs[i].used[dev].data_size += usage;
356-
}
357399
}
400+
LOG_INFO("gpu_device_memory_added_lockfree:%d %d %lu",pid,dev,usage);
401+
return 0;
358402
}
359403
}
360-
unlock_shrreg();
361-
LOG_INFO("gpu_device_memory_added:%d %d %lu -> %lu",pid,dev,usage,get_gpu_memory_usage(dev));
362-
return 0;
404+
405+
LOG_WARN("Process slot not found for pid %d", pid);
406+
return -1;
363407
}
364408

409+
// Lock-free memory remove using atomics
365410
int rm_gpu_device_memory_usage(int32_t pid,int cudadev,size_t usage,int type){
366-
LOG_INFO("rm_gpu_device_memory:%d %d->%d %d:%lu",pid,cudadev,cuda_to_nvml_map(cudadev),type,usage);
411+
LOG_INFO("rm_gpu_device_memory_lockfree:%d %d->%d %d:%lu",pid,cudadev,cuda_to_nvml_map(cudadev),type,usage);
367412
int dev = cuda_to_nvml_map(cudadev);
368413
ensure_initialized();
369-
lock_shrreg();
414+
415+
// Fast path: use cached slot pointer for our own process
416+
if (pid == getpid() && region_info.my_slot != NULL) {
417+
atomic_fetch_sub_explicit(&region_info.my_slot->used[dev].total, usage, memory_order_relaxed);
418+
switch (type) {
419+
case 0:
420+
atomic_fetch_sub_explicit(&region_info.my_slot->used[dev].context_size, usage, memory_order_relaxed);
421+
break;
422+
case 1:
423+
atomic_fetch_sub_explicit(&region_info.my_slot->used[dev].module_size, usage, memory_order_relaxed);
424+
break;
425+
case 2:
426+
atomic_fetch_sub_explicit(&region_info.my_slot->used[dev].data_size, usage, memory_order_relaxed);
427+
break;
428+
}
429+
uint64_t new_total = atomic_load_explicit(&region_info.my_slot->used[dev].total, memory_order_relaxed);
430+
LOG_INFO("after delete_lockfree:%lu",new_total);
431+
return 0;
432+
}
433+
434+
// Slow path: find slot for other process (still lock-free)
435+
int proc_num = atomic_load_explicit(&region_info.shared_region->proc_num, memory_order_acquire);
370436
int i;
371-
for (i=0;i<region_info.shared_region->proc_num;i++){
372-
if (region_info.shared_region->procs[i].pid == pid){
373-
region_info.shared_region->procs[i].used[dev].total-=usage;
437+
for (i=0;i<proc_num;i++){
438+
int32_t slot_pid = atomic_load_explicit(&region_info.shared_region->procs[i].pid, memory_order_acquire);
439+
if (slot_pid == pid){
440+
atomic_fetch_sub_explicit(&region_info.shared_region->procs[i].used[dev].total, usage, memory_order_relaxed);
374441
switch (type) {
375-
case 0:{
376-
region_info.shared_region->procs[i].used[dev].context_size -= usage;
442+
case 0:
443+
atomic_fetch_sub_explicit(&region_info.shared_region->procs[i].used[dev].context_size, usage, memory_order_relaxed);
377444
break;
378-
}
379-
case 1:{
380-
region_info.shared_region->procs[i].used[dev].module_size -= usage;
445+
case 1:
446+
atomic_fetch_sub_explicit(&region_info.shared_region->procs[i].used[dev].module_size, usage, memory_order_relaxed);
447+
break;
448+
case 2:
449+
atomic_fetch_sub_explicit(&region_info.shared_region->procs[i].used[dev].data_size, usage, memory_order_relaxed);
381450
break;
382-
}
383-
case 2:{
384-
region_info.shared_region->procs[i].used[dev].data_size -= usage;
385-
}
386451
}
387-
LOG_INFO("after delete:%lu",region_info.shared_region->procs[i].used[dev].total);
452+
uint64_t new_total = atomic_load_explicit(&region_info.shared_region->procs[i].used[dev].total, memory_order_relaxed);
453+
LOG_INFO("after delete_lockfree:%lu",new_total);
454+
return 0;
388455
}
389456
}
390-
unlock_shrreg();
391-
return 0;
457+
458+
LOG_WARN("Process slot not found for pid %d", pid);
459+
return -1;
392460
}
393461

394462
void get_timespec(int seconds, struct timespec* spec) {
@@ -561,31 +629,57 @@ int clear_proc_slot_nolock(int do_clear) {
561629

562630
void init_proc_slot_withlock() {
563631
int32_t current_pid = getpid();
564-
lock_shrreg();
632+
lock_shrreg(); // Still need lock for modifying process slots
565633
shared_region_t* region = region_info.shared_region;
566-
if (region->proc_num >= SHARED_REGION_MAX_PROCESS_NUM) {
634+
635+
int proc_num = atomic_load_explicit(&region->proc_num, memory_order_acquire);
636+
if (proc_num >= SHARED_REGION_MAX_PROCESS_NUM) {
567637
exit_withlock(-1);
568638
}
569639
signal(SIGUSR2,sig_swap_stub);
570640
signal(SIGUSR1,sig_restore_stub);
641+
571642
// If, by any means a pid of itself is found in region->process, then it is probably caused by crashloop
572643
// we need to reset it.
573644
int i,found=0;
574-
for (i=0; i<region->proc_num; i++) {
575-
if (region->procs[i].pid == current_pid) {
576-
region->procs[i].status = 1;
577-
memset(region->procs[i].used,0,sizeof(device_memory_t)*CUDA_DEVICE_MAX_COUNT);
578-
memset(region->procs[i].device_util,0,sizeof(device_util_t)*CUDA_DEVICE_MAX_COUNT);
645+
for (i=0; i<proc_num; i++) {
646+
int32_t slot_pid = atomic_load_explicit(&region->procs[i].pid, memory_order_acquire);
647+
if (slot_pid == current_pid) {
648+
atomic_store_explicit(&region->procs[i].status, 1, memory_order_release);
649+
650+
// Zero out atomics
651+
for (int dev=0; dev<CUDA_DEVICE_MAX_COUNT; dev++) {
652+
atomic_store_explicit(&region->procs[i].used[dev].total, 0, memory_order_relaxed);
653+
atomic_store_explicit(&region->procs[i].used[dev].context_size, 0, memory_order_relaxed);
654+
atomic_store_explicit(&region->procs[i].used[dev].module_size, 0, memory_order_relaxed);
655+
atomic_store_explicit(&region->procs[i].used[dev].data_size, 0, memory_order_relaxed);
656+
atomic_store_explicit(&region->procs[i].device_util[dev].sm_util, 0, memory_order_relaxed);
657+
atomic_store_explicit(&region->procs[i].monitorused[dev], 0, memory_order_relaxed);
658+
}
659+
660+
region_info.my_slot = &region->procs[i]; // Cache our slot pointer
579661
found = 1;
580662
break;
581663
}
582664
}
665+
583666
if (!found) {
584-
region->procs[region->proc_num].pid = current_pid;
585-
region->procs[region->proc_num].status = 1;
586-
memset(region->procs[region->proc_num].used,0,sizeof(device_memory_t)*CUDA_DEVICE_MAX_COUNT);
587-
memset(region->procs[region->proc_num].device_util,0,sizeof(device_util_t)*CUDA_DEVICE_MAX_COUNT);
588-
region->proc_num++;
667+
// Initialize new slot with atomics
668+
atomic_store_explicit(&region->procs[proc_num].pid, current_pid, memory_order_release);
669+
atomic_store_explicit(&region->procs[proc_num].hostpid, 0, memory_order_relaxed);
670+
atomic_store_explicit(&region->procs[proc_num].status, 1, memory_order_release);
671+
672+
for (int dev=0; dev<CUDA_DEVICE_MAX_COUNT; dev++) {
673+
atomic_store_explicit(&region->procs[proc_num].used[dev].total, 0, memory_order_relaxed);
674+
atomic_store_explicit(&region->procs[proc_num].used[dev].context_size, 0, memory_order_relaxed);
675+
atomic_store_explicit(&region->procs[proc_num].used[dev].module_size, 0, memory_order_relaxed);
676+
atomic_store_explicit(&region->procs[proc_num].used[dev].data_size, 0, memory_order_relaxed);
677+
atomic_store_explicit(&region->procs[proc_num].device_util[dev].sm_util, 0, memory_order_relaxed);
678+
atomic_store_explicit(&region->procs[proc_num].monitorused[dev], 0, memory_order_relaxed);
679+
}
680+
681+
region_info.my_slot = &region->procs[proc_num]; // Cache our slot pointer
682+
atomic_fetch_add_explicit(&region->proc_num, 1, memory_order_release);
589683
}
590684

591685
clear_proc_slot_nolock(1);
@@ -697,8 +791,8 @@ void try_create_shrreg() {
697791
LOG_ERROR("Fail to lock shrreg %s: errno=%d", shr_reg_file, errno);
698792
}
699793
//put_device_info();
700-
if (region->initialized_flag !=
701-
MULTIPROCESS_SHARED_REGION_MAGIC_FLAG) {
794+
int32_t init_flag = atomic_load_explicit(&region->initialized_flag, memory_order_acquire);
795+
if (init_flag != MULTIPROCESS_SHARED_REGION_MAGIC_FLAG) {
702796
region->major_version = MAJOR_VERSION;
703797
region->minor_version = MINOR_VERSION;
704798
do_init_device_memory_limits(
@@ -708,14 +802,17 @@ void try_create_shrreg() {
708802
if (sem_init(&region->sem, 1, 1) != 0) {
709803
LOG_ERROR("Fail to init sem %s: errno=%d", shr_reg_file, errno);
710804
}
711-
__sync_synchronize();
712-
region->sm_init_flag = 0;
713-
region->utilization_switch = 1;
714-
region->recent_kernel = 2;
805+
atomic_store_explicit(&region->sm_init_flag, 0, memory_order_relaxed);
806+
atomic_store_explicit(&region->utilization_switch, 1, memory_order_relaxed);
807+
atomic_store_explicit(&region->recent_kernel, 2, memory_order_relaxed);
808+
atomic_store_explicit(&region->proc_num, 0, memory_order_relaxed);
715809
region->priority = 1;
716810
if (getenv(CUDA_TASK_PRIORITY_ENV)!=NULL)
717811
region->priority = atoi(getenv(CUDA_TASK_PRIORITY_ENV));
718-
region->initialized_flag = MULTIPROCESS_SHARED_REGION_MAGIC_FLAG;
812+
813+
// Release barrier ensures all initialization is visible before flag is set
814+
atomic_thread_fence(memory_order_release);
815+
atomic_store_explicit(&region->initialized_flag, MULTIPROCESS_SHARED_REGION_MAGIC_FLAG, memory_order_release);
719816
} else {
720817
if (region->major_version != MAJOR_VERSION ||
721818
region->minor_version != MINOR_VERSION) {

0 commit comments

Comments
 (0)