Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 69 additions & 49 deletions src/plugins/janus_audiobridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -1762,8 +1762,6 @@ typedef struct janus_audiobridge_participant {
gchar *display; /* Display name (opaque value, only meaningful to application) */
gboolean admin; /* If the participant is an admin (can't be globally muted) */
volatile gint active; /* Whether this participant can receive media at all */
volatile gint encoding; /* Whether this participant is currently encoding */
volatile gint decoding; /* Whether this participant is currently decoding */
gboolean muted; /* Whether this participant is muted */
int volume_gain; /* Gain to apply to the input audio (in percentage) */
int32_t opus_bitrate; /* Bitrate to use for the Opus stream */
Expand Down Expand Up @@ -1802,6 +1800,8 @@ typedef struct janus_audiobridge_participant {
gboolean plainrtp; /* Whether this is a WebRTC participant, or a plain RTP one */
janus_audiobridge_plainrtp_media plainrtp_media;
janus_mutex pmutex;
janus_mutex encoding_mutex; /* Encoding mutex to lock encoder instance */
janus_mutex decoding_mutex; /* Decoding mutex to lock decoder instance */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we could use a single mutex for both, considering that the same participant thread takes care of both encoding and decoding, and the only thing we need to ensure is that the instances exist, but that's something we can discuss at a later stage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same. I also thought to maybe use the participant pmutex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to use a different mutex than pmutex, as we don't want other operations to interfere with transcoding, but we can revisit this later.

/* Opus stuff */
uint32_t sampling_rate; /* Sampling rate to decode at */
OpusEncoder *encoder; /* Opus encoder instance */
Expand Down Expand Up @@ -1951,6 +1951,8 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant
janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media);
janus_mutex_unlock(&participant->pmutex);
janus_mutex_destroy(&participant->pmutex);
janus_mutex_destroy(&participant->encoding_mutex);
janus_mutex_destroy(&participant->decoding_mutex);
janus_mutex_destroy(&participant->qmutex);
janus_mutex_destroy(&participant->rec_mutex);
g_free(participant);
Expand Down Expand Up @@ -4900,7 +4902,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
g_snprintf(error_cause, 512, "Can't reset (not in a room)");
goto prepare_response;
}
participant->reset = TRUE;
g_atomic_int_set(&participant->reset, 1);
response = json_object();
json_object_set_new(response, "audiobridge", json_string("success"));
goto prepare_response;
Expand Down Expand Up @@ -6326,32 +6328,32 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r
if(!session || g_atomic_int_get(&session->destroyed) || !session->participant)
return;
janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant;
if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) ||
(participant->codec == JANUS_AUDIOCODEC_OPUS && !participant->decoder) || !participant->room)
if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) || !participant->room)
return;
if(participant->room && participant->room->muted && !participant->admin)
return;
char *buf = packet->buffer;
uint16_t len = packet->length;
/* Save the frame if we're recording this leg */
janus_recorder_save_frame(participant->arc, buf, len);
if(g_atomic_int_get(&participant->active) && (participant->codec != JANUS_AUDIOCODEC_OPUS ||
(participant->codec == JANUS_AUDIOCODEC_OPUS && participant->decoder))) {
if(g_atomic_int_get(&participant->active)) {
/* First of all, check if a reset on the decoder is due */
if(participant->reset && participant->codec == JANUS_AUDIOCODEC_OPUS) {
if(g_atomic_int_get(&participant->reset) && participant->codec == JANUS_AUDIOCODEC_OPUS) {
/* Create a new decoder and get rid of the old one */
int error = 0;
OpusDecoder *decoder = opus_decoder_create(participant->room->sampling_rate,
participant->stereo ? 2 : 1, &error);
if(error != OPUS_OK) {
JANUS_LOG(LOG_ERR, "Error resetting Opus decoder...\n");
} else {
janus_mutex_lock(&participant->decoding_mutex);
if(participant->decoder)
opus_decoder_destroy(participant->decoder);
participant->decoder = decoder;
janus_mutex_unlock(&participant->decoding_mutex);
JANUS_LOG(LOG_VERB, "Opus decoder reset\n");
}
participant->reset = FALSE;
g_atomic_int_set(&participant->reset, 0);
}
/* We'll need to decode the frame (Opus/G.711 -> slinear), so check the payload type */
janus_rtp_header *rtp = (janus_rtp_header *)buf;
Expand Down Expand Up @@ -6507,20 +6509,17 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle
participant->muted = TRUE;
g_free(participant->display);
participant->display = NULL;
/* Make sure we're not using the encoder/decoder right now, we're going to destroy them */
while(!g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1))
g_usleep(5000);
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_destroy(participant->encoder);
participant->encoder = NULL;
g_atomic_int_set(&participant->encoding, 0);
while(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1))
g_usleep(5000);
janus_mutex_unlock(&participant->encoding_mutex);
janus_mutex_lock(&participant->decoding_mutex);
if(participant->decoder)
opus_decoder_destroy(participant->decoder);
participant->decoder = NULL;
g_atomic_int_set(&participant->decoding, 0);
participant->reset = FALSE;
janus_mutex_unlock(&participant->decoding_mutex);
g_atomic_int_set(&participant->reset, 0);
participant->audio_active_packets = 0;
participant->audio_dBov_sum = 0;
participant->talking = FALSE;
Expand Down Expand Up @@ -6894,14 +6893,16 @@ static void *janus_audiobridge_handler(void *data) {
participant->outbuf = NULL;
participant->encoder = NULL;
participant->decoder = NULL;
participant->reset = FALSE;
g_atomic_int_set(&participant->reset, 0);
participant->fec = FALSE;
participant->last_timestamp = 0;
participant->last_seq = 0;
janus_mutex_init(&participant->qmutex);
participant->arc = NULL;
janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media);
janus_mutex_init(&participant->pmutex);
janus_mutex_init(&participant->encoding_mutex);
janus_mutex_init(&participant->decoding_mutex);
janus_mutex_init(&participant->rec_mutex);
}
participant->session = session;
Expand Down Expand Up @@ -7021,7 +7022,7 @@ static void *janus_audiobridge_handler(void *data) {
JANUS_LOG(LOG_WARN, "RNNoise unavailable, denoising not supported\n");
}
#endif
participant->reset = FALSE;
g_atomic_int_set(&participant->reset, 0);
/* If we need to generate an offer ourselves, do that */
if(gen_offer != NULL)
generate_offer = json_is_true(gen_offer);
Expand Down Expand Up @@ -7353,8 +7354,10 @@ static void *janus_audiobridge_handler(void *data) {
opus_bitrate = 0;
}
participant->opus_bitrate = opus_bitrate;
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_ctl(participant->encoder, OPUS_SET_BITRATE(participant->opus_bitrate ? participant->opus_bitrate : OPUS_AUTO));
janus_mutex_unlock(&participant->encoding_mutex);
}
if(quality) {
int complexity = json_integer_value(quality);
Expand All @@ -7365,8 +7368,10 @@ static void *janus_audiobridge_handler(void *data) {
goto error;
}
participant->opus_complexity = complexity;
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity));
janus_mutex_unlock(&participant->encoding_mutex);
}
if(exploss) {
int expected_loss = json_integer_value(exploss);
Expand All @@ -7377,8 +7382,10 @@ static void *janus_audiobridge_handler(void *data) {
goto error;
}
participant->expected_loss = expected_loss;
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss));
janus_mutex_unlock(&participant->encoding_mutex);
}
if(group && participant->room && participant->room->groups != NULL) {
const char *group_name = json_string_value(group);
Expand Down Expand Up @@ -7818,24 +7825,26 @@ static void *janus_audiobridge_handler(void *data) {
janus_mutex_unlock(&rooms_mutex);
goto error;
}
participant->reset = FALSE;
g_atomic_int_set(&participant->reset, 0);
/* Destroy the previous encoder/decoder and update the references */
while(!g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1))
g_usleep(5000);
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_destroy(participant->encoder);
participant->sampling_rate = audiobridge->sampling_rate;
participant->encoder = new_encoder;
g_atomic_int_set(&participant->encoding, 0);
while(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1))
g_usleep(5000);
janus_mutex_unlock(&participant->encoding_mutex);
janus_mutex_lock(&participant->decoding_mutex);
if(participant->decoder)
opus_decoder_destroy(participant->decoder);
participant->decoder = new_decoder;
g_atomic_int_set(&participant->decoding, 0);
janus_mutex_unlock(&participant->decoding_mutex);
}
if(quality) {
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity));
janus_mutex_unlock(&participant->encoding_mutex);
}
if(quality)
opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity));
/* Everything looks fine, start by telling the folks in the old room this participant is going away */
event = json_object();
json_object_set_new(event, "audiobridge", json_string("event"));
Expand Down Expand Up @@ -7907,6 +7916,7 @@ static void *janus_audiobridge_handler(void *data) {
else if(participant->spatial_position > 100)
participant->spatial_position = 100;
participant->opus_bitrate = opus_bitrate;
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_ctl(participant->encoder, OPUS_SET_BITRATE(participant->opus_bitrate ? participant->opus_bitrate : OPUS_AUTO));
if(quality) {
Expand All @@ -7916,8 +7926,10 @@ static void *janus_audiobridge_handler(void *data) {
}
if(exploss) {
participant->expected_loss = expected_loss;
opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss));
if(participant->encoder)
opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss));
}
janus_mutex_unlock(&participant->encoding_mutex);
#ifdef HAVE_RNNOISE
/* Check if a denoiser is needed now */
participant->denoise = denoise ? json_is_true(denoise) : audiobridge->denoise;
Expand Down Expand Up @@ -8168,7 +8180,10 @@ static void *janus_audiobridge_handler(void *data) {
if(participant->opus_pt > 0 && strstr(msg_sdp, "useinbandfec=1")){
/* Opus codec, inband FEC (from Janus to user) set */
participant->fec = TRUE;
opus_encoder_ctl(participant->encoder, OPUS_SET_INBAND_FEC(participant->fec));
janus_mutex_lock(&participant->encoding_mutex);
if(participant->encoder)
opus_encoder_ctl(participant->encoder, OPUS_SET_INBAND_FEC(participant->fec));
janus_mutex_unlock(&participant->encoding_mutex);
}
JANUS_LOG(LOG_VERB, "Opus payload type is %d, outgoing FEC %s\n", participant->opus_pt, participant->fec ? "enabled" : "disabled");
}
Expand Down Expand Up @@ -9152,8 +9167,10 @@ static void *janus_audiobridge_participant_thread(void *data) {
/* We didn't get a packet: check if PLC can help */
if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && lost_packets_gap <= JITTER_BUFFER_MAX_GAP_SIZE && !participant->muted) {
lost_packets_gap++;
if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) {
janus_mutex_lock(&participant->decoding_mutex);
if(participant->decoder == NULL) {
/* This means we're cleaning up, so don't try to decode */
janus_mutex_unlock(&participant->decoding_mutex);
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
}
Expand All @@ -9177,7 +9194,7 @@ static void *janus_audiobridge_participant_thread(void *data) {
/* Update the details */
participant->last_seq = pkt->seq_number;
participant->last_timestamp = pkt->timestamp;
g_atomic_int_set(&participant->decoding, 0);
janus_mutex_unlock(&participant->decoding_mutex);
if(pkt->length < 0) {
JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length));
g_free(pkt->data);
Expand All @@ -9201,11 +9218,6 @@ static void *janus_audiobridge_participant_thread(void *data) {
} else {
/* Decode the audio packet */
bpkt = (janus_audiobridge_buffer_packet *)jbp.data;
if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) {
/* This means we're cleaning up, so don't try to decode */
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
}
/* Access the payload */
char *buffer = bpkt->rtp ? bpkt->rtp->buffer : NULL;
uint16_t len = bpkt->rtp ? bpkt->rtp->length : 0;
Expand All @@ -9214,7 +9226,6 @@ static void *janus_audiobridge_participant_thread(void *data) {
if(!payload) {
JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n",
participant->codec == JANUS_AUDIOCODEC_OPUS ? "Opus" : "G.711");
g_atomic_int_set(&participant->decoding, 0);
janus_audiobridge_buffer_packet_destroy(bpkt);
continue;
}
Expand All @@ -9233,12 +9244,19 @@ static void *janus_audiobridge_participant_thread(void *data) {
pkt->length = 0;
if(participant->codec == JANUS_AUDIOCODEC_OPUS) {
/* Opus */
janus_mutex_lock(&participant->decoding_mutex);
if(participant->decoder == NULL) {
/* This means we're cleaning up, so don't try to decode */
janus_mutex_unlock(&participant->decoding_mutex);
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
}
pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, BUFFER_SAMPLES, 0);
janus_mutex_unlock(&participant->decoding_mutex);
} else if(participant->codec == JANUS_AUDIOCODEC_PCMA || participant->codec == JANUS_AUDIOCODEC_PCMU) {
/* G.711 */
if(plen != 160) {
JANUS_LOG(LOG_WARN, "[G.711] Wrong packet size (expected 160, got %d), skipping audio packet\n", plen);
g_atomic_int_set(&participant->decoding, 0);
janus_audiobridge_buffer_packet_destroy(bpkt);
g_free(pkt->data);
g_free(pkt);
Expand Down Expand Up @@ -9267,7 +9285,6 @@ static void *janus_audiobridge_participant_thread(void *data) {
/* Update the details */
participant->last_seq = pkt->seq_number;
participant->last_timestamp = pkt->timestamp;
g_atomic_int_set(&participant->decoding, 0);
if(pkt->length < 0) {
if(participant->codec == JANUS_AUDIOCODEC_OPUS) {
JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length));
Expand Down Expand Up @@ -9295,7 +9312,7 @@ static void *janus_audiobridge_participant_thread(void *data) {
mixedpkt = g_async_queue_try_pop(participant->outbuf);
if(mixedpkt != NULL && g_atomic_int_get(&session->destroyed) == 0 && g_atomic_int_get(&session->started)) {
if(g_atomic_int_get(&participant->active) && (participant->codec == JANUS_AUDIOCODEC_PCMA ||
participant->codec == JANUS_AUDIOCODEC_PCMU) && g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) {
participant->codec == JANUS_AUDIOCODEC_PCMU)) {
/* Encode using G.711 */
if(mixedpkt->length != 320) {
/* TODO Resample */
Expand All @@ -9311,7 +9328,6 @@ static void *janus_audiobridge_participant_thread(void *data) {
for(i=0; i<160; i++)
*(payload+12+i) = janus_audiobridge_g711_ulaw_encode(outBuffer[i]);
}
g_atomic_int_set(&participant->encoding, 0);
outpkt->length = 172; /* Take the RTP header into consideration */
/* Update RTP header */
outpkt->data->version = 2;
Expand All @@ -9324,16 +9340,20 @@ static void *janus_audiobridge_participant_thread(void *data) {
outpkt->timestamp = mixedpkt->timestamp/6;
outpkt->seq_number = mixedpkt->seq_number;
janus_audiobridge_relay_rtp_packet(participant->session, outpkt);
} else if(g_atomic_int_get(&participant->active) && participant->encoder &&
g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) {
/* Encode raw frame to Opus */
} else if(g_atomic_int_get(&participant->active)) {
opus_int16 *outBuffer = (opus_int16 *)mixedpkt->data;
outpkt->length = opus_encode(participant->encoder, outBuffer,
participant->stereo ? mixedpkt->length/2 : mixedpkt->length, payload+12, 1500-12);
g_atomic_int_set(&participant->encoding, 0);
janus_mutex_lock(&participant->encoding_mutex);
/* Encode raw frame to Opus */
if(participant->encoder) {
outpkt->length = opus_encode(participant->encoder, outBuffer,
participant->stereo ? mixedpkt->length/2 : mixedpkt->length, payload+12, 1500-12);
} else {
outpkt->length = 0;
}
janus_mutex_unlock(&participant->encoding_mutex);
if(outpkt->length < 0) {
JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", outpkt->length, opus_strerror(outpkt->length));
} else {
} else if(outpkt->length > 0) {
outpkt->length += 12; /* Take the RTP header into consideration */
/* Update RTP header */
outpkt->data->version = 2;
Expand Down