Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay reinitialization when the stream is paused while the change happens #234

Merged
merged 5 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions run_device_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ echo "RUST_BACKTRACE is set to ${RUST_BACKTRACE}\n"
cargo test test_aggregate -- --ignored --nocapture

cargo test test_switch_device -- --ignored --nocapture
cargo test test_switch_while_paused -- --ignored --nocapture

cargo test test_plug_and_unplug_device -- --ignored --nocapture

Expand Down
53 changes: 45 additions & 8 deletions src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,11 +1019,14 @@ extern "C" fn audiounit_property_listener_callback(
{
let callback = stm.device_changed_callback.lock().unwrap();
if let Some(device_changed_callback) = *callback {
cubeb_log!("Calling device changed callback");
unsafe {
device_changed_callback(stm.user_ptr);
}
}
}

cubeb_log!("Reinitializing stream with new device because of device change, async");
stm.reinit_async();

NO_ERR
Expand Down Expand Up @@ -4576,6 +4579,7 @@ struct AudioUnitStream<'ctx> {
stopped: AtomicBool,
draining: AtomicBool,
reinit_pending: AtomicBool,
delayed_reinit: bool,
destroy_pending: AtomicBool,
// Latency requested by the user.
latency_frames: u32,
Expand Down Expand Up @@ -4623,6 +4627,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
stopped: AtomicBool::new(true),
draining: AtomicBool::new(false),
reinit_pending: AtomicBool::new(false),
delayed_reinit: false,
destroy_pending: AtomicBool::new(false),
latency_frames,
output_device_latency_frames: AtomicU32::new(0),
Expand Down Expand Up @@ -4682,7 +4687,8 @@ impl<'ctx> AudioUnitStream<'ctx> {
}

if self.stopped.load(Ordering::SeqCst) {
// Something stopped the stream, we must not reinit.
// Something stopped the stream, reinit on next start
self.delayed_reinit = true;
return Ok(());
}

Expand All @@ -4706,6 +4712,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
cubeb_log!("Using new default output device");
self.core_stream_data.output_device =
match create_device_info(kAudioObjectUnknown, DeviceType::OUTPUT) {
None => {
Expand All @@ -4724,6 +4731,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
cubeb_log!("Using new default input device");
self.core_stream_data.input_device =
match create_device_info(kAudioObjectUnknown, DeviceType::INPUT) {
None => {
Expand All @@ -4734,6 +4742,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
}
}

cubeb_log!("Reinit: setup");
self.core_stream_data
.setup(&mut self.context.shared_voice_processing_unit)
.inspect_err(|_| {
Expand Down Expand Up @@ -4771,6 +4780,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
// Use a new thread, through the queue, to avoid deadlock when calling
// Get/SetProperties method from inside notify callback
queue.run_async(move || {
cubeb_log!("Reinitialization of stream");
let stm_ptr = self as *const AudioUnitStream;
if self.destroy_pending.load(Ordering::SeqCst) {
cubeb_log!(
Expand Down Expand Up @@ -4867,17 +4877,44 @@ impl<'ctx> Drop for AudioUnitStream<'ctx> {

impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
fn start(&mut self) -> Result<()> {
let was_stopped = self.stopped.load(Ordering::SeqCst);
let was_draining = self.draining.load(Ordering::SeqCst);
self.stopped.store(false, Ordering::SeqCst);
self.draining.store(false, Ordering::SeqCst);

// Execute start in serial queue to avoid racing with destroy or reinit.
let result = self
.queue
self.queue
.clone()
.run_sync(|| self.core_stream_data.start_audiounits())
.unwrap();

result?;
.run_sync(|| -> Result<()> {
// Need reinitialization: device was changed when paused. It will be started after
// reinit because self.stopped is false.
if self.delayed_reinit {
let rv = self.reinit().inspect_err(|_| {
cubeb_log!(
"({:p}) delayed reinit during start failed.",
self.core_stream_data.stm_ptr
);
});
// In case of failure, restore the state
if rv.is_err() {
self.stopped.store(was_stopped, Ordering::SeqCst);
self.draining.store(was_draining, Ordering::SeqCst);
return rv;
}
self.delayed_reinit = false;
Ok(())
} else {
// Execute start in serial queue to avoid racing with destroy or reinit.
let rv = self.core_stream_data.start_audiounits();
if rv.is_err() {
cubeb_log!("({:p}) start failed.", self.core_stream_data.stm_ptr);
self.stopped.store(was_stopped, Ordering::SeqCst);
self.draining.store(was_draining, Ordering::SeqCst);
return rv;
}
Ok(())
}
})
.unwrap()?;

self.notify_state_changed(State::Started);

Expand Down
121 changes: 121 additions & 0 deletions src/backend/tests/device_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use super::utils::{
};
use super::*;
use std::sync::{LockResult, WaitTimeoutResult};
use std::thread;
use std::time;

// Switch default devices used by the active streams, to test stream reinitialization
// ================================================================================================
Expand Down Expand Up @@ -73,6 +75,125 @@ fn test_switch_device_in_scope(scope: Scope) {
});
}

#[ignore]
#[test]
fn test_switch_while_paused() {
test_switch_device_in_scope_while_paused(Scope::Input);
test_switch_device_in_scope_while_paused(Scope::Output);
}

fn test_switch_device_in_scope_while_paused(scope: Scope) {
println!(
"Switch default device for {:?} while the stream is paused.",
scope
);

// Do nothing if there is no 2 available devices at least.
let devices = test_get_devices_in_scope(scope.clone());
if devices.len() < 2 {
println!("Need 2 devices for {:?} at least. Skip.", scope);
return;
}

let mut device_switcher = TestDeviceSwitcher::new(scope.clone());

let notifier = Arc::new(Notifier::new(0));
let also_notifier = notifier.clone();
let listener = run_serially(|| {
test_create_device_change_listener(scope.clone(), move |_addresses| {
let mut cnt = notifier.lock().unwrap();
*cnt += 1;
notifier.notify(cnt);
NO_ERR
})
});
run_serially(|| listener.start());

let changed_watcher = Watcher::new(&also_notifier);
test_get_started_stream_in_scope(scope.clone(), move |stream| loop {
// pause the stream, change device, start the stream
// peek under the hood to the find the device in use currently
let stm = unsafe { &mut *(stream as *mut AudioUnitStream) };
let before = if scope == Scope::Output {
stm.core_stream_data.output_unit
} else {
stm.core_stream_data.input_unit
};

let check_devices = |current| {
stm.queue.run_sync(|| {
let (bus, unit, id) = if scope == Scope::Output {
(
AU_OUT_BUS,
stm.core_stream_data.output_unit,
stm.core_stream_data.output_device.id,
)
} else {
(
AU_IN_BUS,
stm.core_stream_data.input_unit,
stm.core_stream_data.input_device.id,
)
};
let mut device_id: AudioDeviceID = 0;
let mut size = std::mem::size_of::<AudioDeviceID>();
let status = audio_unit_get_property(
unit,
kAudioOutputUnitProperty_CurrentDevice,
kAudioUnitScope_Global,
bus,
&mut device_id,
&mut size,
);
if status != NO_ERR {
panic!("Could not get device ID from audiounit");
}
assert_eq!(id, current);
assert_eq!(device_id, current);
});
};

check_devices(device_switcher.current());

// Pause the stream, and change the default device
assert_eq!(unsafe { OPS.stream_stop.unwrap()(stream) }, ffi::CUBEB_OK);

let start_cnt = changed_watcher.lock().unwrap().clone();
device_switcher.next();
let mut guard = changed_watcher.lock().unwrap();
guard = changed_watcher
.wait_while(guard, |cnt| *cnt == start_cnt)
.unwrap();
if *guard >= devices.len() {
break;
}

// Wait until the switch is effective and the stream has been marked for delayed
// reinitialization. delayed_reinit can only be access from the queue thread. Depending on
// the setup this can take some time so we sleep a bit to not hammer the main thread.
let mut switched = false;
while !switched {
switched = stm.queue.run_sync(|| stm.delayed_reinit).unwrap();
if !switched {
let ten_millis = time::Duration::from_millis(10);
thread::sleep(ten_millis);
}
}

// Start the stream, and check that the device in use isn't the same as before pausing
assert_eq!(unsafe { OPS.stream_start.unwrap()(stream) }, ffi::CUBEB_OK);

check_devices(device_switcher.current());

let after = if scope == Scope::Output {
stm.core_stream_data.output_unit
} else {
stm.core_stream_data.input_unit
};
assert_ne!(before, after);
});
}

fn test_get_started_stream_in_scope<F>(scope: Scope, operation: F)
where
F: FnOnce(*mut ffi::cubeb_stream),
Expand Down
3 changes: 3 additions & 0 deletions src/backend/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,9 @@ impl TestDeviceSwitcher {
}
}
}
pub fn current(&self) -> AudioObjectID {
self.devices[self.current_device_index]
}

fn set_device(&self, device: AudioObjectID) -> std::result::Result<AudioObjectID, OSStatus> {
test_set_default_device(device, self.scope.clone())
Expand Down
Loading