-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Make participant encoder/decoder access thread-safe #3606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
4e81fef
56618cc
51d4da2
df12104
90efa05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1762,8 +1762,6 @@ typedef struct janus_audiobridge_participant { | |
| gchar *display; /* Display name (opaque value, only meaningful to application) */ | ||
| gboolean admin; /* If the participant is an admin (can't be globally muted) */ | ||
| volatile gint active; /* Whether this participant can receive media at all */ | ||
| volatile gint encoding; /* Whether this participant is currently encoding */ | ||
| volatile gint decoding; /* Whether this participant is currently decoding */ | ||
| gboolean muted; /* Whether this participant is muted */ | ||
| int volume_gain; /* Gain to apply to the input audio (in percentage) */ | ||
| int32_t opus_bitrate; /* Bitrate to use for the Opus stream */ | ||
|
|
@@ -1802,6 +1800,8 @@ typedef struct janus_audiobridge_participant { | |
| gboolean plainrtp; /* Whether this is a WebRTC participant, or a plain RTP one */ | ||
| janus_audiobridge_plainrtp_media plainrtp_media; | ||
| janus_mutex pmutex; | ||
| janus_mutex encoding_mutex; /* Encoding mutex to lock encoder instance */ | ||
| janus_mutex decoding_mutex; /* Decoding mutex to lock decoder instance */ | ||
| /* Opus stuff */ | ||
| uint32_t sampling_rate; /* Sampling rate to decode at */ | ||
| OpusEncoder *encoder; /* Opus encoder instance */ | ||
|
|
@@ -1951,6 +1951,8 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant | |
| janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); | ||
| janus_mutex_unlock(&participant->pmutex); | ||
| janus_mutex_destroy(&participant->pmutex); | ||
| janus_mutex_destroy(&participant->encoding_mutex); | ||
| janus_mutex_destroy(&participant->decoding_mutex); | ||
| janus_mutex_destroy(&participant->qmutex); | ||
| janus_mutex_destroy(&participant->rec_mutex); | ||
| g_free(participant); | ||
|
|
@@ -4900,7 +4902,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s | |
| g_snprintf(error_cause, 512, "Can't reset (not in a room)"); | ||
| goto prepare_response; | ||
| } | ||
| participant->reset = TRUE; | ||
| g_atomic_int_set(&participant->reset, TRUE); | ||
woutd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| response = json_object(); | ||
| json_object_set_new(response, "audiobridge", json_string("success")); | ||
| goto prepare_response; | ||
|
|
@@ -6326,32 +6328,32 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r | |
| if(!session || g_atomic_int_get(&session->destroyed) || !session->participant) | ||
| return; | ||
| janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant; | ||
| if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) || | ||
| (participant->codec == JANUS_AUDIOCODEC_OPUS && !participant->decoder) || !participant->room) | ||
| if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) || !participant->room) | ||
woutd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return; | ||
| if(participant->room && participant->room->muted && !participant->admin) | ||
| return; | ||
| char *buf = packet->buffer; | ||
| uint16_t len = packet->length; | ||
| /* Save the frame if we're recording this leg */ | ||
| janus_recorder_save_frame(participant->arc, buf, len); | ||
| if(g_atomic_int_get(&participant->active) && (participant->codec != JANUS_AUDIOCODEC_OPUS || | ||
| (participant->codec == JANUS_AUDIOCODEC_OPUS && participant->decoder))) { | ||
| if(g_atomic_int_get(&participant->active)) { | ||
woutd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /* First of all, check if a reset on the decoder is due */ | ||
| if(participant->reset && participant->codec == JANUS_AUDIOCODEC_OPUS) { | ||
| if(g_atomic_int_get(&participant->reset) && participant->codec == JANUS_AUDIOCODEC_OPUS) { | ||
| /* Create a new decoder and get rid of the old one */ | ||
| int error = 0; | ||
| OpusDecoder *decoder = opus_decoder_create(participant->room->sampling_rate, | ||
| participant->stereo ? 2 : 1, &error); | ||
| if(error != OPUS_OK) { | ||
| JANUS_LOG(LOG_ERR, "Error resetting Opus decoder...\n"); | ||
| } else { | ||
| janus_mutex_lock(&participant->decoding_mutex); | ||
| if(participant->decoder) | ||
| opus_decoder_destroy(participant->decoder); | ||
| participant->decoder = decoder; | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| JANUS_LOG(LOG_VERB, "Opus decoder reset\n"); | ||
| } | ||
| participant->reset = FALSE; | ||
| g_atomic_int_set(&participant->reset, FALSE); | ||
woutd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| /* We'll need to decode the frame (Opus/G.711 -> slinear), so check the payload type */ | ||
| janus_rtp_header *rtp = (janus_rtp_header *)buf; | ||
|
|
@@ -6507,20 +6509,17 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle | |
| participant->muted = TRUE; | ||
| g_free(participant->display); | ||
| participant->display = NULL; | ||
| /* Make sure we're not using the encoder/decoder right now, we're going to destroy them */ | ||
| while(!g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) | ||
| g_usleep(5000); | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder) | ||
| opus_encoder_destroy(participant->encoder); | ||
| participant->encoder = NULL; | ||
| g_atomic_int_set(&participant->encoding, 0); | ||
| while(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) | ||
| g_usleep(5000); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| janus_mutex_lock(&participant->decoding_mutex); | ||
| if(participant->decoder) | ||
| opus_decoder_destroy(participant->decoder); | ||
| participant->decoder = NULL; | ||
| g_atomic_int_set(&participant->decoding, 0); | ||
| participant->reset = FALSE; | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| g_atomic_int_set(&participant->reset, FALSE); | ||
| participant->audio_active_packets = 0; | ||
| participant->audio_dBov_sum = 0; | ||
| participant->talking = FALSE; | ||
|
|
@@ -6894,14 +6893,16 @@ static void *janus_audiobridge_handler(void *data) { | |
| participant->outbuf = NULL; | ||
| participant->encoder = NULL; | ||
| participant->decoder = NULL; | ||
| participant->reset = FALSE; | ||
| g_atomic_int_set(&participant->reset, FALSE); | ||
| participant->fec = FALSE; | ||
| participant->last_timestamp = 0; | ||
| participant->last_seq = 0; | ||
| janus_mutex_init(&participant->qmutex); | ||
| participant->arc = NULL; | ||
| janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); | ||
| janus_mutex_init(&participant->pmutex); | ||
| janus_mutex_init(&participant->encoding_mutex); | ||
| janus_mutex_init(&participant->decoding_mutex); | ||
| janus_mutex_init(&participant->rec_mutex); | ||
| } | ||
| participant->session = session; | ||
|
|
@@ -7021,7 +7022,7 @@ static void *janus_audiobridge_handler(void *data) { | |
| JANUS_LOG(LOG_WARN, "RNNoise unavailable, denoising not supported\n"); | ||
| } | ||
| #endif | ||
| participant->reset = FALSE; | ||
| g_atomic_int_set(&participant->reset, FALSE); | ||
| /* If we need to generate an offer ourselves, do that */ | ||
| if(gen_offer != NULL) | ||
| generate_offer = json_is_true(gen_offer); | ||
|
|
@@ -7353,8 +7354,10 @@ static void *janus_audiobridge_handler(void *data) { | |
| opus_bitrate = 0; | ||
| } | ||
| participant->opus_bitrate = opus_bitrate; | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder) | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_BITRATE(participant->opus_bitrate ? participant->opus_bitrate : OPUS_AUTO)); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| } | ||
| if(quality) { | ||
| int complexity = json_integer_value(quality); | ||
|
|
@@ -7365,8 +7368,10 @@ static void *janus_audiobridge_handler(void *data) { | |
| goto error; | ||
| } | ||
| participant->opus_complexity = complexity; | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder) | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity)); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| } | ||
| if(exploss) { | ||
| int expected_loss = json_integer_value(exploss); | ||
|
|
@@ -7377,8 +7382,10 @@ static void *janus_audiobridge_handler(void *data) { | |
| goto error; | ||
| } | ||
| participant->expected_loss = expected_loss; | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder) | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss)); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| } | ||
| if(group && participant->room && participant->room->groups != NULL) { | ||
| const char *group_name = json_string_value(group); | ||
|
|
@@ -7818,24 +7825,26 @@ static void *janus_audiobridge_handler(void *data) { | |
| janus_mutex_unlock(&rooms_mutex); | ||
| goto error; | ||
| } | ||
| participant->reset = FALSE; | ||
| g_atomic_int_set(&participant->reset, FALSE); | ||
| /* Destroy the previous encoder/decoder and update the references */ | ||
| while(!g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) | ||
| g_usleep(5000); | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder) | ||
| opus_encoder_destroy(participant->encoder); | ||
| participant->sampling_rate = audiobridge->sampling_rate; | ||
| participant->encoder = new_encoder; | ||
| g_atomic_int_set(&participant->encoding, 0); | ||
| while(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) | ||
| g_usleep(5000); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| janus_mutex_lock(&participant->decoding_mutex); | ||
| if(participant->decoder) | ||
| opus_decoder_destroy(participant->decoder); | ||
| participant->decoder = new_decoder; | ||
| g_atomic_int_set(&participant->decoding, 0); | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| } | ||
| if(quality) { | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder) | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity)); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| } | ||
| if(quality) | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity)); | ||
| /* Everything looks fine, start by telling the folks in the old room this participant is going away */ | ||
| event = json_object(); | ||
| json_object_set_new(event, "audiobridge", json_string("event")); | ||
|
|
@@ -7907,6 +7916,7 @@ static void *janus_audiobridge_handler(void *data) { | |
| else if(participant->spatial_position > 100) | ||
| participant->spatial_position = 100; | ||
| participant->opus_bitrate = opus_bitrate; | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder) | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_BITRATE(participant->opus_bitrate ? participant->opus_bitrate : OPUS_AUTO)); | ||
| if(quality) { | ||
|
|
@@ -7916,8 +7926,10 @@ static void *janus_audiobridge_handler(void *data) { | |
| } | ||
| if(exploss) { | ||
| participant->expected_loss = expected_loss; | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss)); | ||
| if(participant->encoder) | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_PACKET_LOSS_PERC(participant->expected_loss)); | ||
| } | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| #ifdef HAVE_RNNOISE | ||
| /* Check if a denoiser is needed now */ | ||
| participant->denoise = denoise ? json_is_true(denoise) : audiobridge->denoise; | ||
|
|
@@ -8168,7 +8180,10 @@ static void *janus_audiobridge_handler(void *data) { | |
| if(participant->opus_pt > 0 && strstr(msg_sdp, "useinbandfec=1")){ | ||
| /* Opus codec, inband FEC (from Janus to user) set */ | ||
| participant->fec = TRUE; | ||
| opus_encoder_ctl(participant->encoder, OPUS_SET_INBAND_FEC(participant->fec)); | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| if(participant->encoder != NULL) | ||
woutd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| opus_encoder_ctl(participant->encoder, OPUS_SET_INBAND_FEC(participant->fec)); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| } | ||
| JANUS_LOG(LOG_VERB, "Opus payload type is %d, outgoing FEC %s\n", participant->opus_pt, participant->fec ? "enabled" : "disabled"); | ||
| } | ||
|
|
@@ -9152,8 +9167,10 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| /* We didn't get a packet: check if PLC can help */ | ||
| if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && lost_packets_gap <= JITTER_BUFFER_MAX_GAP_SIZE && !participant->muted) { | ||
| lost_packets_gap++; | ||
| if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { | ||
| janus_mutex_lock(&participant->decoding_mutex); | ||
woutd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if(!participant->decoder) { | ||
woutd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /* This means we're cleaning up, so don't try to decode */ | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| janus_audiobridge_buffer_packet_destroy(bpkt); | ||
| break; | ||
| } | ||
|
|
@@ -9177,7 +9194,7 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| /* Update the details */ | ||
| participant->last_seq = pkt->seq_number; | ||
| participant->last_timestamp = pkt->timestamp; | ||
| g_atomic_int_set(&participant->decoding, 0); | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| if(pkt->length < 0) { | ||
| JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length)); | ||
| g_free(pkt->data); | ||
|
|
@@ -9201,8 +9218,10 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| } else { | ||
| /* Decode the audio packet */ | ||
| bpkt = (janus_audiobridge_buffer_packet *)jbp.data; | ||
| if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { | ||
| janus_mutex_lock(&participant->decoding_mutex); | ||
| if(!participant->decoder) { | ||
|
||
| /* This means we're cleaning up, so don't try to decode */ | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| janus_audiobridge_buffer_packet_destroy(bpkt); | ||
| break; | ||
| } | ||
|
|
@@ -9214,7 +9233,7 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| if(!payload) { | ||
| JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n", | ||
| participant->codec == JANUS_AUDIOCODEC_OPUS ? "Opus" : "G.711"); | ||
| g_atomic_int_set(&participant->decoding, 0); | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| janus_audiobridge_buffer_packet_destroy(bpkt); | ||
| continue; | ||
| } | ||
|
|
@@ -9238,7 +9257,7 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| /* G.711 */ | ||
| if(plen != 160) { | ||
| JANUS_LOG(LOG_WARN, "[G.711] Wrong packet size (expected 160, got %d), skipping audio packet\n", plen); | ||
| g_atomic_int_set(&participant->decoding, 0); | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| janus_audiobridge_buffer_packet_destroy(bpkt); | ||
| g_free(pkt->data); | ||
| g_free(pkt); | ||
|
|
@@ -9267,7 +9286,7 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| /* Update the details */ | ||
| participant->last_seq = pkt->seq_number; | ||
| participant->last_timestamp = pkt->timestamp; | ||
| g_atomic_int_set(&participant->decoding, 0); | ||
| janus_mutex_unlock(&participant->decoding_mutex); | ||
| if(pkt->length < 0) { | ||
| if(participant->codec == JANUS_AUDIOCODEC_OPUS) { | ||
| JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length)); | ||
|
|
@@ -9295,7 +9314,8 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| mixedpkt = g_async_queue_try_pop(participant->outbuf); | ||
| if(mixedpkt != NULL && g_atomic_int_get(&session->destroyed) == 0 && g_atomic_int_get(&session->started)) { | ||
| if(g_atomic_int_get(&participant->active) && (participant->codec == JANUS_AUDIOCODEC_PCMA || | ||
| participant->codec == JANUS_AUDIOCODEC_PCMU) && g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) { | ||
| participant->codec == JANUS_AUDIOCODEC_PCMU)) { | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
|
||
| /* Encode using G.711 */ | ||
| if(mixedpkt->length != 320) { | ||
| /* TODO Resample */ | ||
|
|
@@ -9311,7 +9331,7 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| for(i=0; i<160; i++) | ||
| *(payload+12+i) = janus_audiobridge_g711_ulaw_encode(outBuffer[i]); | ||
| } | ||
| g_atomic_int_set(&participant->encoding, 0); | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| outpkt->length = 172; /* Take the RTP header into consideration */ | ||
| /* Update RTP header */ | ||
| outpkt->data->version = 2; | ||
|
|
@@ -9324,16 +9344,20 @@ static void *janus_audiobridge_participant_thread(void *data) { | |
| outpkt->timestamp = mixedpkt->timestamp/6; | ||
| outpkt->seq_number = mixedpkt->seq_number; | ||
| janus_audiobridge_relay_rtp_packet(participant->session, outpkt); | ||
| } else if(g_atomic_int_get(&participant->active) && participant->encoder && | ||
| g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) { | ||
| /* Encode raw frame to Opus */ | ||
| } else if(g_atomic_int_get(&participant->active)) { | ||
| opus_int16 *outBuffer = (opus_int16 *)mixedpkt->data; | ||
| outpkt->length = opus_encode(participant->encoder, outBuffer, | ||
| participant->stereo ? mixedpkt->length/2 : mixedpkt->length, payload+12, 1500-12); | ||
| g_atomic_int_set(&participant->encoding, 0); | ||
| janus_mutex_lock(&participant->encoding_mutex); | ||
| /* Encode raw frame to Opus */ | ||
| if(participant->encoder) { | ||
| outpkt->length = opus_encode(participant->encoder, outBuffer, | ||
| participant->stereo ? mixedpkt->length/2 : mixedpkt->length, payload+12, 1500-12); | ||
| } else { | ||
| outpkt->length = 0; | ||
| } | ||
| janus_mutex_unlock(&participant->encoding_mutex); | ||
| if(outpkt->length < 0) { | ||
| JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", outpkt->length, opus_strerror(outpkt->length)); | ||
| } else { | ||
| } else if(outpkt->length > 0) { | ||
| outpkt->length += 12; /* Take the RTP header into consideration */ | ||
| /* Update RTP header */ | ||
| outpkt->data->version = 2; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we could use a single mutex for both, considering that the same participant thread takes care of both encoding and decoding, and the only thing we need to ensure is that the instances exist, but that's something we can discuss at a later stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering the same. I also thought to maybe use the participant
pmutex.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to use a different mutex than
pmutex, as we don't want other operations to interfere with transcoding, but we can revisit this later.