Skip to content

Parallel conference bridge #4241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
20ef6fa
parallel conference bridge
LeonidGoltsblat Jan 5, 2025
e4ff03f
create_pasv_port() fix
LeonidGoltsblat Jan 5, 2025
58d911a
minimized conditional compilation
LeonidGoltsblat Jan 20, 2025
76cc0b7
Merge remote-tracking branch 'github.com/master' into parallel-confer…
LeonidGoltsblat Jan 20, 2025
cc83b8c
signed/unsigned fix
LeonidGoltsblat Jan 20, 2025
5d8d81a
conference bridge with native pjsip multithreading + pj_barrier_r
LeonidGoltsblat Jan 26, 2025
8e38b1a
added && _POSIX_BARRIERS >= 200112L
LeonidGoltsblat Jan 26, 2025
29201a8
pthread_mutex_t -> pj_mutex_t
LeonidGoltsblat Jan 26, 2025
eec77e5
fix compile error
LeonidGoltsblat Jan 26, 2025
60e5b02
fix compile error
LeonidGoltsblat Jan 26, 2025
97cccb0
last minute fixes
LeonidGoltsblat Jan 26, 2025
6170aa7
Merge branch 'pjsip:master' into parallel-conference-bridge
LeonidGoltsblat Jan 29, 2025
a3caf2c
barriers changes on code review
LeonidGoltsblat Feb 7, 2025
a20ed8c
documentation and code improvements related to barriers
LeonidGoltsblat Feb 8, 2025
8d73950
Merge remote-tracking branch 'github.com/master' into parallel-confer…
LeonidGoltsblat Feb 10, 2025
1746b44
pjmedia_conf_param introduced
LeonidGoltsblat Feb 10, 2025
b1dbb76
PJMEDIA_CONF_THREADS macro
LeonidGoltsblat Feb 17, 2025
90ec903
pjsua, pjsua2 runtime setting for the conference bridge threads
LeonidGoltsblat Feb 17, 2025
5f9f2ab
resolve on code review
LeonidGoltsblat Feb 25, 2025
212a027
m-to-n switching optimization
LeonidGoltsblat Mar 3, 2025
e0a0e9e
simplification of the algorithm
LeonidGoltsblat Mar 3, 2025
e2f33be
eliminated residual echo after disconnection
LeonidGoltsblat Mar 4, 2025
faa309b
systest conference test
LeonidGoltsblat Mar 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions pjlib/include/pj/os.h
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,89 @@ PJ_DECL(pj_status_t) pj_event_destroy(pj_event_t *event);
*/
#endif /* PJ_HAS_EVENT_OBJ */


/* **************************************************************************/
/**
* @defgroup PJ_BARRIER_SEC Barrier sections.
* @ingroup PJ_OS
* @{
* This module provides abstraction to pj_barrier_t - synchronization barrier.
* It allows threads to block until all participating threads have reached
* the barrier,ensuring synchronization at specific points in execution.
* pj_barrier_t provides a barrier mechanism for synchronizing threads in
* a multithreaded application, similar to
* the POSIX pthread_barrier_wait or Windows EnterSynchronizationBarrier.
*/

/**
* Flags that control the behavior of the barrier
* Supported on Windows platform starting from Windows 8
* Otherwize, the flags are ignored.
*/
enum pj_barrier_flags {
/* Specifies that the thread entering the barrier should block
* immediately until the last thread enters the barrier. */
PJ_BARRIER_FLAGS_BLOCK_ONLY = 1,

/* Specifies that the thread entering the barrier should spin until
* the last thread enters the barrier,
* even if the spinning thread exceeds the barrier's maximum spin count.*/
PJ_BARRIER_FLAGS_SPIN_ONLY = 2,

/* Specifies that the function can skip the work required to ensure
* that it is safe to delete the barrier, which can improve performance.
* All threads that enter this barrier must specify the flag;
* otherwise, the flag is ignored.
* This flag should be used only if the barrier will never be deleted.
* "Never" means "when some thread is waiting on this barrier".
*/
PJ_BARRIER_FLAGS_NO_DELETE = 4
};

/**
* Create a barrier object.
* pj_barrier_create() creates a barrier object that can be used to synchronize threads.
* The barrier object is initialized with a trip count that specifies the number of threads
* that must call pj_barrier_wait() before any are allowed to proceed.
*
* @param pool The pool to allocate the barrier object.
* @param trip_count The number of threads that must call pj_barrier_wait() before any are allowed to proceed.
* @param p_barrier Pointer to hold the barrier object upon return.
*
* @return PJ_SUCCESS on success, or the error code.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier);

/**
* Destroy a barrier object.
* pj_barrier_destroy() destroys a barrier object and releases any resources associated with the barrier.
*
* @param barrier The barrier to destroy.
*
* @return PJ_SUCCESS on success, or the error code.
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier);

/**
* Wait for all threads to reach the barrier
* pj_barrier_wait() allows threads to block until all participating threads have reached the barrier,
* ensuring synchronization at specific points in execution.
* It provides a barrier mechanism for synchronizing threads in a multithreaded application,
* similar to the POSIX pthread_barrier_wait or Windows EnterSynchronizationBarrier.
*
* @param barrier The barrier to wait on
* @param flags Flags that control the behavior of the barrier (combination of pj_barrier_flags)
*
* @return Returns PJ_TRUE for a single (arbitrary) thread synchronized
* at the barrier and PJ_FALSE for each of the other threads.
* Otherwise, an error number shall be returned to indicate the error.
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags);

/**
* @}
*/

/* **************************************************************************/
/**
* @addtogroup PJ_TIME Time Data Type and Manipulation.
Expand Down
3 changes: 3 additions & 0 deletions pjlib/include/pj/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ typedef struct pj_sem_t pj_sem_t;
/** Event object. */
typedef struct pj_event_t pj_event_t;

/** Barrier object. */
typedef struct pj_barrier_t pj_barrier_t;

/** Unidirectional stream pipe object. */
typedef struct pj_pipe_t pj_pipe_t;

Expand Down
122 changes: 122 additions & 0 deletions pjlib/src/pj/os_core_unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ struct pj_event_t
};
#endif /* PJ_HAS_EVENT_OBJ */

struct pj_barrier_t {
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS >= 200112L
/* pthread_barrier is supported. */
pthread_barrier_t barrier;
#else
/* pthread_barrier is not supported. */
pj_mutex_t mutex;
pthread_cond_t cond;
unsigned count;
unsigned trip_count;
#endif
};

/*
* Flag and reference counter for PJLIB instance.
Expand Down Expand Up @@ -2080,6 +2092,116 @@ PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event)

#endif /* PJ_HAS_EVENT_OBJ */

///////////////////////////////////////////////////////////////////////////////
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS >= 200112L
/* pthread_barrier is supported. */

/**
* Barrier object.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
pj_barrier_t *barrier;
int rc;
PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t *)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;
rc = pthread_barrier_init(&barrier->barrier, NULL, trip_count);
if (rc != 0)
return PJ_RETURN_OS_ERROR(rc);
*p_barrier = barrier;
return PJ_SUCCESS;
}

/**
* Wait on the barrier.
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags) {
PJ_UNUSED_ARG(flags);
int rc = pthread_barrier_wait(&barrier->barrier);
switch (rc)
{
case 0:
return PJ_FALSE;
case PTHREAD_BARRIER_SERIAL_THREAD:
return PJ_TRUE;
default:
return PJ_RETURN_OS_ERROR(rc);
}
}

/**
* Destroy the barrier.
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier) {
int status = pthread_barrier_destroy(&barrier->barrier);
if (status == 0)
return PJ_SUCCESS;
else
return PJ_RETURN_OS_ERROR(status);
}

#else // _POSIX_BARRIERS
/* pthread_barrier is not supported. */

/**
* Barrier object.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
pj_barrier_t *barrier;
pj_status_t status;

PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t *)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;
status = init_mutex(&barrier->mutex, "barrier%p", PJ_MUTEX_SIMPLE);
if (status != PJ_SUCCESS)
return status;

pthread_cond_init(&barrier->cond, NULL);
barrier->count = 0;
barrier->trip_count = trip_count;
*p_barrier = barrier;
return PJ_SUCCESS;
}

/**
* Wait on the barrier.
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags) {
PJ_UNUSED_ARG(flags);

pj_bool_t is_last = PJ_FALSE;

pthread_mutex_lock(&barrier->mutex.mutex);
barrier->count++;
if (barrier->count >= barrier->trip_count)
{
barrier->count = 0;
pthread_cond_broadcast(&barrier->cond);
is_last = PJ_TRUE;
}
else
{
pthread_cond_wait(&barrier->cond, &barrier->mutex.mutex);
}
pthread_mutex_unlock(&barrier->mutex.mutex);

return is_last;
}

/**
* Destroy the barrier.
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier) {
pthread_cond_destroy(&barrier->cond);
return pj_mutex_destroy(&barrier->mutex);
}

#endif // _POSIX_BARRIERS


///////////////////////////////////////////////////////////////////////////////
#if defined(PJ_TERM_HAS_COLOR) && PJ_TERM_HAS_COLOR != 0
/*
Expand Down
135 changes: 135 additions & 0 deletions pjlib/src/pj/os_core_win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ struct pj_atomic_t
long value;
};

/*
* Implementation of pj_barrier_t.
*/
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
struct pj_barrier_t {
SYNCHRONIZATION_BARRIER sync_barrier;
};
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
struct pj_barrier_t {
CRITICAL_SECTION mutex;
CONDITION_VARIABLE cond;
unsigned count;
unsigned waiting;
};
#else
struct pj_barrier_t {
HANDLE cond; /* Semaphore */
LONG count; /* Number of threads required to pass the barrier */
LONG waiting;/* Number of threads waiting at the barrier */
};
#endif

/*
* Flag and reference counter for PJLIB instance.
*/
Expand Down Expand Up @@ -1546,6 +1568,119 @@ PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event)

#endif /* PJ_HAS_EVENT_OBJ */

///////////////////////////////////////////////////////////////////////////////

/*
* pj_barrier_create()
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
pj_barrier_t *barrier;
PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t *)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
if (InitializeSynchronizationBarrier(&barrier->sync_barrier, trip_count, -1))
{
*p_barrier = barrier;
return PJ_SUCCESS;
}
else
return pj_get_os_error();
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
InitializeCriticalSection(&barrier->mutex);
InitializeConditionVariable(&barrier->cond);
barrier->count = trip_count;
barrier->waiting = 0;
*p_barrier = barrier;
return PJ_SUCCESS;
#else
barrier->cond = CreateSemaphore(NULL,
0, /* initial count */
trip_count, /* max count */
NULL);
if (!barrier->cond)
return pj_get_os_error();
barrier->count = trip_count;
barrier->waiting = 0;
*p_barrier = barrier;
return PJ_SUCCESS;
#endif
}

/*
* pj_barrier_destroy()
*/
pj_status_t pj_barrier_destroy(pj_barrier_t *barrier) {
PJ_ASSERT_RETURN(barrier, PJ_EINVAL);
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
DeleteSynchronizationBarrier(&barrier->sync_barrier);
return PJ_SUCCESS;
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
DeleteCriticalSection(&barrier->mutex);
return PJ_SUCCESS;
#else
if (CloseHandle(barrier->cond))
return PJ_SUCCESS;
else
return pj_get_os_error();
#endif
}

/*
* pj_barrier_wait()
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags) {
PJ_ASSERT_RETURN(barrier, PJ_EINVAL);
#if PJ_WIN32_WINNT >= _WIN32_WINNT_WIN8
DWORD dwFlags = ((flags & PJ_BARRIER_FLAGS_BLOCK_ONLY) ? SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY : 0) |
((flags & PJ_BARRIER_FLAGS_SPIN_ONLY) ? SYNCHRONIZATION_BARRIER_FLAGS_SPIN_ONLY : 0) |
((flags & PJ_BARRIER_FLAGS_NO_DELETE) ? SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE : 0);
return EnterSynchronizationBarrier(&barrier->sync_barrier, dwFlags);
#elif PJ_WIN32_WINNT >= _WIN32_WINNT_VISTA
PJ_UNUSED_ARG(flags);
EnterCriticalSection(&barrier->mutex);
if (++barrier->waiting == barrier->count)
{
barrier->waiting = 0;
LeaveCriticalSection(&barrier->mutex);
WakeAllConditionVariable(&barrier->cond);
return PJ_TRUE;
}
else
{
BOOL rc = SleepConditionVariableCS(&barrier->cond, &barrier->mutex, INFINITE);
LeaveCriticalSection(&barrier->mutex);
if (rc)
return PJ_FALSE;
else
return pj_get_os_error();
}
#else
PJ_UNUSED_ARG(flags);

if (InterlockedIncrement(&barrier->waiting) == barrier->count)
{
LONG previousCount = 0;
barrier->waiting = 0;
/* Release all threads waiting on the semaphore */
if (barrier->count == 1 || ReleaseSemaphore(barrier->cond, barrier->count - 1, &previousCount))
{
PJ_ASSERT_RETURN(previousCount == 0, PJ_EBUG);
return PJ_TRUE;
}
else
return pj_get_os_error();
}

DWORD rc = WaitForSingleObject(barrier->cond, INFINITE);
if (rc == WAIT_OBJECT_0)
return PJ_FALSE;
else
return pj_get_os_error();
#endif
}

///////////////////////////////////////////////////////////////////////////////
#if defined(PJ_TERM_HAS_COLOR) && PJ_TERM_HAS_COLOR != 0
/*
Expand Down
1 change: 1 addition & 0 deletions pjmedia/build/pjmedia.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@
<ClCompile Include="..\src\pjmedia\clock_thread.c" />
<ClCompile Include="..\src\pjmedia\codec.c" />
<ClCompile Include="..\src\pjmedia\conference.c" />
<ClCompile Include="..\src\pjmedia\conf_openmp.c" />
<ClCompile Include="..\src\pjmedia\conf_switch.c" />
<ClCompile Include="..\src\pjmedia\converter.c" />
<ClCompile Include="..\src\pjmedia\converter_libswscale.c" />
Expand Down
Loading
Loading