@@ -35,7 +35,7 @@ use self::utils::*;
35
35
use backend:: ringbuf:: RingBuffer ;
36
36
use cubeb_backend:: {
37
37
ffi, ChannelLayout , Context , ContextOps , DeviceCollectionRef , DeviceId , DeviceRef , DeviceType ,
38
- Error , InputProcessingParams , Ops , Result , SampleFormat , State , Stream , StreamOps ,
38
+ Error , ErrorCode , InputProcessingParams , Ops , Result , SampleFormat , State , Stream , StreamOps ,
39
39
StreamParams , StreamParamsRef , StreamPrefs ,
40
40
} ;
41
41
use mach:: mach_time:: { mach_absolute_time, mach_timebase_info} ;
@@ -1162,6 +1162,7 @@ fn create_voiceprocessing_audiounit(
1162
1162
in_device : & device_info ,
1163
1163
out_device : & device_info ,
1164
1164
) -> Result < OwningHandle < AudioUnit > > {
1165
+ debug_assert_running_serially ( ) ;
1165
1166
assert ! ( in_device. flags. contains( device_flags:: DEV_INPUT ) ) ;
1166
1167
assert ! ( !in_device. flags. contains( device_flags:: DEV_OUTPUT ) ) ;
1167
1168
assert ! ( !out_device. flags. contains( device_flags:: DEV_INPUT ) ) ;
@@ -2134,44 +2135,29 @@ struct SharedVoiceProcessingUnitStorage {
2134
2135
#[ derive( Debug ) ]
2135
2136
struct SharedVoiceProcessingUnit {
2136
2137
sync_storage : Mutex < SharedVoiceProcessingUnitStorage > ,
2137
- condvar : Condvar ,
2138
2138
queue : Queue ,
2139
2139
}
2140
2140
2141
2141
impl SharedVoiceProcessingUnit {
2142
2142
fn new ( queue : Queue ) -> Self {
2143
2143
Self {
2144
2144
sync_storage : Mutex :: new ( SharedVoiceProcessingUnitStorage :: default ( ) ) ,
2145
- condvar : Condvar :: new ( ) ,
2146
2145
queue,
2147
2146
}
2148
2147
}
2149
2148
2150
- fn prime ( & mut self ) {
2149
+ fn ensure_unit ( & mut self ) -> Result < ( ) > {
2151
2150
debug_assert_running_serially ( ) ;
2152
- {
2153
- let mut guard = self . sync_storage . lock ( ) . unwrap ( ) ;
2154
- if guard. priming {
2155
- return ;
2156
- }
2157
- if guard. storage . is_some ( ) {
2158
- return ;
2159
- }
2160
- cubeb_log ! ( "Priming the shared voiceprocessing unit" ) ;
2161
- guard. priming = true ;
2162
- }
2163
- let unit = create_typed_audiounit ( kAudioUnitSubType_VoiceProcessingIO) ;
2164
2151
let mut guard = self . sync_storage . lock ( ) . unwrap ( ) ;
2152
+ if guard. storage . is_some ( ) {
2153
+ // No need to create vpio unit as one already exists.
2154
+ return Err ( Error :: not_supported ( ) ) ;
2155
+ }
2165
2156
guard. priming = false ;
2166
- self . condvar . notify_all ( ) ;
2167
- if let Err ( e) = unit {
2168
- cubeb_log ! (
2169
- "Failed creating voiceprocessing unit while priming async. r={}" ,
2170
- e
2171
- ) ;
2172
- return ;
2157
+ let unit = create_typed_audiounit ( kAudioUnitSubType_VoiceProcessingIO) ;
2158
+ if unit. is_err ( ) {
2159
+ return Err ( Error :: error ( ) ) ;
2173
2160
}
2174
- cubeb_log ! ( "Shared voiceprocessing unit primed and ready" ) ;
2175
2161
2176
2162
match get_default_device ( DeviceType :: OUTPUT ) {
2177
2163
None => {
@@ -2191,31 +2177,54 @@ impl SharedVoiceProcessingUnit {
2191
2177
2192
2178
let old_storage = guard. storage . replace ( Arc :: from ( Mutex :: new ( unit. ok ( ) ) ) ) ;
2193
2179
assert ! ( old_storage. is_none( ) ) ;
2180
+ Ok ( ( ) )
2181
+ }
2182
+
2183
+ fn prime ( & mut self ) {
2184
+ {
2185
+ let mut guard = self . sync_storage . lock ( ) . unwrap ( ) ;
2186
+ if guard. storage . is_some ( ) {
2187
+ // No need to prime as unit already exists.
2188
+ return ;
2189
+ }
2190
+ if guard. priming {
2191
+ // No need to prime async as another call is already doing it.
2192
+ return ;
2193
+ }
2194
+ cubeb_log ! ( "Priming the shared voiceprocessing unit" ) ;
2195
+ guard. priming = true ;
2196
+ }
2197
+ self . queue
2198
+ . clone ( )
2199
+ . run_async ( move || match self . ensure_unit ( ) {
2200
+ Ok ( ( ) ) => {
2201
+ cubeb_log ! ( "Shared voiceprocessing unit primed and ready" ) ;
2202
+ }
2203
+ Err ( e) => match e. code ( ) {
2204
+ ErrorCode :: Error => {
2205
+ cubeb_log ! ( "Error creating shared voiceprocessing unit for priming" ) ;
2206
+ }
2207
+ ErrorCode :: NotSupported => {
2208
+ cubeb_log ! ( "Shared voiceprocessing unit was created before async priming" ) ;
2209
+ }
2210
+ _ => unreachable ! ( "Unexpected error" ) ,
2211
+ } ,
2212
+ } ) ;
2194
2213
}
2195
2214
2196
2215
fn take ( & mut self ) -> Result < OwningHandle < AudioUnit > > {
2197
2216
debug_assert_running_serially ( ) ;
2198
- let guard = self . sync_storage . lock ( ) . unwrap ( ) ;
2199
- let mut guard = self
2200
- . condvar
2201
- . wait_while ( guard, |storage| storage. priming )
2202
- . unwrap ( ) ;
2203
- if guard. storage . is_none ( ) {
2204
- cubeb_log ! ( "Creating synchronously the shared voiceprocessing unit" ) ;
2205
- let unit = create_typed_audiounit ( kAudioUnitSubType_VoiceProcessingIO) ?;
2206
- guard. storage . replace ( Arc :: from ( Mutex :: new ( Some ( unit) ) ) ) ;
2217
+ if let Err ( e) = self . ensure_unit ( ) {
2218
+ if e. code ( ) == ErrorCode :: Error {
2219
+ return Err ( e) ;
2220
+ }
2207
2221
}
2222
+ let mut guard = self . sync_storage . lock ( ) . unwrap ( ) ;
2208
2223
let storage: Arc < SharedStorage < AudioUnit > > = guard. storage . as_mut ( ) . unwrap ( ) . clone ( ) ;
2209
2224
let mut guard = ( * storage) . lock ( ) . unwrap ( ) ;
2210
2225
match guard. take ( ) {
2211
- Some ( unit) => {
2212
- cubeb_log ! ( "voiceprocessing audiounit now taken" ) ;
2213
- Ok ( OwningHandle :: new ( Arc :: downgrade ( & storage) , unit) )
2214
- }
2215
- None => {
2216
- cubeb_log ! ( "voiceprocessing audiounit was requested but already taken" ) ;
2217
- Err ( Error :: device_unavailable ( ) )
2218
- }
2226
+ Some ( unit) => Ok ( OwningHandle :: new ( Arc :: downgrade ( & storage) , unit) ) ,
2227
+ None => Err ( Error :: device_unavailable ( ) ) ,
2219
2228
}
2220
2229
}
2221
2230
}
@@ -2225,6 +2234,7 @@ unsafe impl Sync for SharedVoiceProcessingUnit {}
2225
2234
2226
2235
impl Drop for SharedVoiceProcessingUnit {
2227
2236
fn drop ( & mut self ) {
2237
+ debug_assert_not_running_serially ( ) ;
2228
2238
self . queue . run_final ( || {
2229
2239
let mut guard = self . sync_storage . lock ( ) . unwrap ( ) ;
2230
2240
assert ! ( !guard. priming) ;
@@ -2418,6 +2428,11 @@ impl ContextOps for AudioUnitContext {
2418
2428
let queue_label = format ! ( "{}.context.{:p}" , DISPATCH_QUEUE_LABEL , ctx. as_ref( ) ) ;
2419
2429
ctx. serial_queue =
2420
2430
Queue :: new_with_target ( queue_label. as_str ( ) , get_serial_queue_singleton ( ) ) ;
2431
+ let shared_vp_queue = Queue :: new_with_target (
2432
+ format ! ( "{}.shared_vpio" , queue_label) . as_str ( ) ,
2433
+ & ctx. serial_queue ,
2434
+ ) ;
2435
+ ctx. shared_voice_processing_unit = SharedVoiceProcessingUnit :: new ( shared_vp_queue) ;
2421
2436
Ok ( unsafe { Context :: from_ptr ( Box :: into_raw ( ctx) as * mut _ ) } )
2422
2437
}
2423
2438
@@ -3071,14 +3086,17 @@ impl<'ctx> CoreStreamData<'ctx> {
3071
3086
& self . input_device ,
3072
3087
& self . output_device ,
3073
3088
) {
3074
- cubeb_log ! ( "({:p}) Using VoiceProcessingIO AudioUnit" , self . stm_ptr) ;
3089
+ cubeb_log ! (
3090
+ "({:p}) Took shared VoiceProcessingIO AudioUnit" ,
3091
+ self . stm_ptr
3092
+ ) ;
3075
3093
self . input_unit = * au_handle. as_mut ( ) ;
3076
3094
self . output_unit = * au_handle. as_mut ( ) ;
3077
3095
self . voiceprocessing_unit_handle = Some ( au_handle) ;
3078
3096
return Ok ( ( self . input_device . clone ( ) , self . output_device . clone ( ) ) ) ;
3079
3097
}
3080
3098
cubeb_log ! (
3081
- "({:p}) Failed to create VoiceProcessingIO AudioUnit. Trying a regular one." ,
3099
+ "({:p}) Failed to take VoiceProcessingIO AudioUnit. Trying a regular one." ,
3082
3100
self . stm_ptr
3083
3101
) ;
3084
3102
}
0 commit comments