diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index d90d069913..628e88ef50 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -1762,8 +1762,6 @@ typedef struct janus_audiobridge_participant { gchar *display; /* Display name (opaque value, only meaningful to application) */ gboolean admin; /* If the participant is an admin (can't be globally muted) */ volatile gint active; /* Whether this participant can receive media at all */ - volatile gint encoding; /* Whether this participant is currently encoding */ - volatile gint decoding; /* Whether this participant is currently decoding */ gboolean muted; /* Whether this participant is muted */ int volume_gain; /* Gain to apply to the input audio (in percentage) */ int32_t opus_bitrate; /* Bitrate to use for the Opus stream */ @@ -1802,6 +1800,8 @@ typedef struct janus_audiobridge_participant { gboolean plainrtp; /* Whether this is a WebRTC participant, or a plain RTP one */ janus_audiobridge_plainrtp_media plainrtp_media; janus_mutex pmutex; + janus_mutex encoding_mutex; /* Encoding mutex to lock encoder instance */ + janus_mutex decoding_mutex; /* Decoding mutex to lock decoder instance */ /* Opus stuff */ uint32_t sampling_rate; /* Sampling rate to decode at */ OpusEncoder *encoder; /* Opus encoder instance */ @@ -1951,6 +1951,8 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); janus_mutex_unlock(&participant->pmutex); janus_mutex_destroy(&participant->pmutex); + janus_mutex_destroy(&participant->encoding_mutex); + janus_mutex_destroy(&participant->decoding_mutex); janus_mutex_destroy(&participant->qmutex); janus_mutex_destroy(&participant->rec_mutex); g_free(participant); @@ -4900,7 +4902,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s g_snprintf(error_cause, 512, "Can't reset (not in a room)"); goto prepare_response; } - participant->reset = TRUE; + g_atomic_int_set(&participant->reset, 1); response = json_object(); json_object_set_new(response, "audiobridge", json_string("success")); goto prepare_response; @@ -6326,8 +6328,7 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r if(!session || g_atomic_int_get(&session->destroyed) || !session->participant) return; janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant; - if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) || - (participant->codec == JANUS_AUDIOCODEC_OPUS && !participant->decoder) || !participant->room) + if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) || !participant->room) return; if(participant->room && participant->room->muted && !participant->admin) return; @@ -6335,10 +6336,9 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r uint16_t len = packet->length; /* Save the frame if we're recording this leg */ janus_recorder_save_frame(participant->arc, buf, len); - if(g_atomic_int_get(&participant->active) && (participant->codec != JANUS_AUDIOCODEC_OPUS || - (participant->codec == JANUS_AUDIOCODEC_OPUS && participant->decoder))) { + if(g_atomic_int_get(&participant->active)) { /* First of all, check if a reset on the decoder is due */ - if(participant->reset && participant->codec == JANUS_AUDIOCODEC_OPUS) { + if(g_atomic_int_get(&participant->reset) && participant->codec == JANUS_AUDIOCODEC_OPUS) { /* Create a new decoder and get rid of the old one */ int error = 0; OpusDecoder *decoder = opus_decoder_create(participant->room->sampling_rate, @@ -6346,12 +6346,14 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r if(error != OPUS_OK) { JANUS_LOG(LOG_ERR, "Error resetting Opus decoder...\n"); } else { + janus_mutex_lock(&participant->decoding_mutex); if(participant->decoder) opus_decoder_destroy(participant->decoder); participant->decoder = decoder; + janus_mutex_unlock(&participant->decoding_mutex); JANUS_LOG(LOG_VERB, "Opus decoder reset\n"); } - participant->reset = FALSE; + g_atomic_int_set(&participant->reset, 0); } /* We'll need to decode the frame (Opus/G.711 -> slinear), so check the payload type */ janus_rtp_header *rtp = (janus_rtp_header *)buf; @@ -6507,20 +6509,17 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle participant->muted = TRUE; g_free(participant->display); participant->display = NULL; - /* Make sure we're not using the encoder/decoder right now, we're going to destroy them */ - while(!g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) - g_usleep(5000); + janus_mutex_lock(&participant->encoding_mutex); if(participant->encoder) opus_encoder_destroy(participant->encoder); participant->encoder = NULL; - g_atomic_int_set(&participant->encoding, 0); - while(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) - g_usleep(5000); + janus_mutex_unlock(&participant->encoding_mutex); + janus_mutex_lock(&participant->decoding_mutex); if(participant->decoder) opus_decoder_destroy(participant->decoder); participant->decoder = NULL; - g_atomic_int_set(&participant->decoding, 0); - participant->reset = FALSE; + janus_mutex_unlock(&participant->decoding_mutex); + g_atomic_int_set(&participant->reset, 0); participant->audio_active_packets = 0; participant->audio_dBov_sum = 0; participant->talking = FALSE; @@ -6894,7 +6893,7 @@ static void *janus_audiobridge_handler(void *data) { participant->outbuf = NULL; participant->encoder = NULL; participant->decoder = NULL; - participant->reset = FALSE; + g_atomic_int_set(&participant->reset, 0); participant->fec = FALSE; participant->last_timestamp = 0; participant->last_seq = 0; @@ -6902,6 +6901,8 @@ static void *janus_audiobridge_handler(void *data) { participant->arc = NULL; janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); janus_mutex_init(&participant->pmutex); + janus_mutex_init(&participant->encoding_mutex); + janus_mutex_init(&participant->decoding_mutex); janus_mutex_init(&participant->rec_mutex); } participant->session = session; @@ -7021,7 +7022,7 @@ static void *janus_audiobridge_handler(void *data) { JANUS_LOG(LOG_WARN, "RNNoise unavailable, denoising not supported\n"); } #endif - participant->reset = FALSE; + g_atomic_int_set(&participant->reset, 0); /* If we need to generate an offer ourselves, do that */ if(gen_offer != NULL) generate_offer = json_is_true(gen_offer); @@ -7353,8 +7354,10 @@ static void *janus_audiobridge_handler(void *data) { opus_bitrate = 0; } participant->opus_bitrate = opus_bitrate; + janus_mutex_lock(&participant->encoding_mutex); if(participant->encoder) opus_encoder_ctl(participant->encoder, OPUS_SET_BITRATE(participant->opus_bitrate ? participant->opus_bitrate : OPUS_AUTO)); + janus_mutex_unlock(&participant->encoding_mutex); } if(quality) { int complexity = json_integer_value(quality); @@ -7365,8 +7368,10 @@ static void *janus_audiobridge_handler(void *data) { goto error; } participant->opus_complexity = complexity; + janus_mutex_lock(&participant->encoding_mutex); if(participant->encoder) opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity)); + janus_mutex_unlock(&participant->encoding_mutex); } if(exploss) { int expected_loss = json_integer_value(exploss); @@ -7377,8 +7382,10 @@ static void *janus_audiobridge_handler(void *data) { goto error; } participant->expected_loss = expected_loss; + janus_mutex_lock(&participant->encoding_mutex); if(participant->encoder) opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss)); + janus_mutex_unlock(&participant->encoding_mutex); } if(group && participant->room && participant->room->groups != NULL) { const char *group_name = json_string_value(group); @@ -7818,24 +7825,26 @@ static void *janus_audiobridge_handler(void *data) { janus_mutex_unlock(&rooms_mutex); goto error; } - participant->reset = FALSE; + g_atomic_int_set(&participant->reset, 0); /* Destroy the previous encoder/decoder and update the references */ - while(!g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) - g_usleep(5000); + janus_mutex_lock(&participant->encoding_mutex); if(participant->encoder) opus_encoder_destroy(participant->encoder); participant->sampling_rate = audiobridge->sampling_rate; participant->encoder = new_encoder; - g_atomic_int_set(&participant->encoding, 0); - while(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) - g_usleep(5000); + janus_mutex_unlock(&participant->encoding_mutex); + janus_mutex_lock(&participant->decoding_mutex); if(participant->decoder) opus_decoder_destroy(participant->decoder); participant->decoder = new_decoder; - g_atomic_int_set(&participant->decoding, 0); + janus_mutex_unlock(&participant->decoding_mutex); + } + if(quality) { + janus_mutex_lock(&participant->encoding_mutex); + if(participant->encoder) + opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity)); + janus_mutex_unlock(&participant->encoding_mutex); } - if(quality) - opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity)); /* Everything looks fine, start by telling the folks in the old room this participant is going away */ event = json_object(); json_object_set_new(event, "audiobridge", json_string("event")); @@ -7907,6 +7916,7 @@ static void *janus_audiobridge_handler(void *data) { else if(participant->spatial_position > 100) participant->spatial_position = 100; participant->opus_bitrate = opus_bitrate; + janus_mutex_lock(&participant->encoding_mutex); if(participant->encoder) opus_encoder_ctl(participant->encoder, OPUS_SET_BITRATE(participant->opus_bitrate ? participant->opus_bitrate : OPUS_AUTO)); if(quality) { @@ -7916,8 +7926,10 @@ static void *janus_audiobridge_handler(void *data) { } if(exploss) { participant->expected_loss = expected_loss; - opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss)); + if(participant->encoder) + opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss)); } + janus_mutex_unlock(&participant->encoding_mutex); #ifdef HAVE_RNNOISE /* Check if a denoiser is needed now */ participant->denoise = denoise ? json_is_true(denoise) : audiobridge->denoise; @@ -8168,7 +8180,10 @@ static void *janus_audiobridge_handler(void *data) { if(participant->opus_pt > 0 && strstr(msg_sdp, "useinbandfec=1")){ /* Opus codec, inband FEC (from Janus to user) set */ participant->fec = TRUE; - opus_encoder_ctl(participant->encoder, OPUS_SET_INBAND_FEC(participant->fec)); + janus_mutex_lock(&participant->encoding_mutex); + if(participant->encoder) + opus_encoder_ctl(participant->encoder, OPUS_SET_INBAND_FEC(participant->fec)); + janus_mutex_unlock(&participant->encoding_mutex); } JANUS_LOG(LOG_VERB, "Opus payload type is %d, outgoing FEC %s\n", participant->opus_pt, participant->fec ? "enabled" : "disabled"); } @@ -9152,8 +9167,10 @@ static void *janus_audiobridge_participant_thread(void *data) { /* We didn't get a packet: check if PLC can help */ if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && lost_packets_gap <= JITTER_BUFFER_MAX_GAP_SIZE && !participant->muted) { lost_packets_gap++; - if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { + janus_mutex_lock(&participant->decoding_mutex); + if(participant->decoder == NULL) { /* This means we're cleaning up, so don't try to decode */ + janus_mutex_unlock(&participant->decoding_mutex); janus_audiobridge_buffer_packet_destroy(bpkt); break; } @@ -9167,8 +9184,9 @@ static void *janus_audiobridge_participant_thread(void *data) { pkt->seq_number = participant->last_seq + 1; /* This is a redundant packet, so we can't parse any extension info */ pkt->silence = FALSE; - janus_audiobridge_participant_istalking(session, participant, NULL, NULL); pkt->length = opus_decode(participant->decoder, NULL, 0, (opus_int16 *)pkt->data, output_samples, 0); + janus_mutex_unlock(&participant->decoding_mutex); + janus_audiobridge_participant_istalking(session, participant, NULL, NULL); #ifdef HAVE_RNNOISE /* Check if we need to denoise this packet */ if(participant->denoise) @@ -9177,7 +9195,6 @@ static void *janus_audiobridge_participant_thread(void *data) { /* Update the details */ participant->last_seq = pkt->seq_number; participant->last_timestamp = pkt->timestamp; - g_atomic_int_set(&participant->decoding, 0); if(pkt->length < 0) { JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length)); g_free(pkt->data); @@ -9201,11 +9218,6 @@ static void *janus_audiobridge_participant_thread(void *data) { } else { /* Decode the audio packet */ bpkt = (janus_audiobridge_buffer_packet *)jbp.data; - if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { - /* This means we're cleaning up, so don't try to decode */ - janus_audiobridge_buffer_packet_destroy(bpkt); - break; - } /* Access the payload */ char *buffer = bpkt->rtp ? bpkt->rtp->buffer : NULL; uint16_t len = bpkt->rtp ? bpkt->rtp->length : 0; @@ -9214,7 +9226,6 @@ static void *janus_audiobridge_participant_thread(void *data) { if(!payload) { JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n", participant->codec == JANUS_AUDIOCODEC_OPUS ? "Opus" : "G.711"); - g_atomic_int_set(&participant->decoding, 0); janus_audiobridge_buffer_packet_destroy(bpkt); continue; } @@ -9233,12 +9244,19 @@ static void *janus_audiobridge_participant_thread(void *data) { pkt->length = 0; if(participant->codec == JANUS_AUDIOCODEC_OPUS) { /* Opus */ + janus_mutex_lock(&participant->decoding_mutex); + if(participant->decoder == NULL) { + /* This means we're cleaning up, so don't try to decode */ + janus_mutex_unlock(&participant->decoding_mutex); + janus_audiobridge_buffer_packet_destroy(bpkt); + break; + } pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, BUFFER_SAMPLES, 0); + janus_mutex_unlock(&participant->decoding_mutex); } else if(participant->codec == JANUS_AUDIOCODEC_PCMA || participant->codec == JANUS_AUDIOCODEC_PCMU) { /* G.711 */ if(plen != 160) { JANUS_LOG(LOG_WARN, "[G.711] Wrong packet size (expected 160, got %d), skipping audio packet\n", plen); - g_atomic_int_set(&participant->decoding, 0); janus_audiobridge_buffer_packet_destroy(bpkt); g_free(pkt->data); g_free(pkt); @@ -9267,7 +9285,6 @@ static void *janus_audiobridge_participant_thread(void *data) { /* Update the details */ participant->last_seq = pkt->seq_number; participant->last_timestamp = pkt->timestamp; - g_atomic_int_set(&participant->decoding, 0); if(pkt->length < 0) { if(participant->codec == JANUS_AUDIOCODEC_OPUS) { JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length)); @@ -9295,7 +9312,7 @@ static void *janus_audiobridge_participant_thread(void *data) { mixedpkt = g_async_queue_try_pop(participant->outbuf); if(mixedpkt != NULL && g_atomic_int_get(&session->destroyed) == 0 && g_atomic_int_get(&session->started)) { if(g_atomic_int_get(&participant->active) && (participant->codec == JANUS_AUDIOCODEC_PCMA || - participant->codec == JANUS_AUDIOCODEC_PCMU) && g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) { + participant->codec == JANUS_AUDIOCODEC_PCMU)) { /* Encode using G.711 */ if(mixedpkt->length != 320) { /* TODO Resample */ @@ -9311,7 +9328,6 @@ static void *janus_audiobridge_participant_thread(void *data) { for(i=0; i<160; i++) *(payload+12+i) = janus_audiobridge_g711_ulaw_encode(outBuffer[i]); } - g_atomic_int_set(&participant->encoding, 0); outpkt->length = 172; /* Take the RTP header into consideration */ /* Update RTP header */ outpkt->data->version = 2; @@ -9324,16 +9340,20 @@ static void *janus_audiobridge_participant_thread(void *data) { outpkt->timestamp = mixedpkt->timestamp/6; outpkt->seq_number = mixedpkt->seq_number; janus_audiobridge_relay_rtp_packet(participant->session, outpkt); - } else if(g_atomic_int_get(&participant->active) && participant->encoder && - g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) { - /* Encode raw frame to Opus */ + } else if(g_atomic_int_get(&participant->active)) { opus_int16 *outBuffer = (opus_int16 *)mixedpkt->data; - outpkt->length = opus_encode(participant->encoder, outBuffer, - participant->stereo ? mixedpkt->length/2 : mixedpkt->length, payload+12, 1500-12); - g_atomic_int_set(&participant->encoding, 0); + janus_mutex_lock(&participant->encoding_mutex); + /* Encode raw frame to Opus */ + if(participant->encoder) { + outpkt->length = opus_encode(participant->encoder, outBuffer, + participant->stereo ? mixedpkt->length/2 : mixedpkt->length, payload+12, 1500-12); + } else { + outpkt->length = 0; + } + janus_mutex_unlock(&participant->encoding_mutex); if(outpkt->length < 0) { JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", outpkt->length, opus_strerror(outpkt->length)); - } else { + } else if(outpkt->length > 0) { outpkt->length += 12; /* Take the RTP header into consideration */ /* Update RTP header */ outpkt->data->version = 2;