Update `wasapi` backend for `StreamEvent` API addition.

Refactors much of the `EventLoop::run` implementation in order to make
error handling a little easier.
This commit is contained in:
mitchmindtree 2019-06-22 02:48:47 +02:00
parent 39cd5d0084
commit 3e3cf26cde
1 changed files with 274 additions and 162 deletions

View File

@ -26,7 +26,10 @@ use Format;
use PauseStreamError; use PauseStreamError;
use PlayStreamError; use PlayStreamError;
use SampleFormat; use SampleFormat;
use StreamCloseCause;
use StreamData; use StreamData;
use StreamError;
use StreamEvent;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
use UnknownTypeInputBuffer; use UnknownTypeInputBuffer;
@ -443,12 +446,12 @@ impl EventLoop {
#[inline] #[inline]
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamData) where F: FnMut(StreamId, StreamEvent)
{ {
self.run_inner(&mut callback); self.run_inner(&mut callback);
} }
fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! { fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! {
unsafe { unsafe {
// We keep `run_context` locked forever, which guarantees that two invocations of // We keep `run_context` locked forever, which guarantees that two invocations of
// `run()` cannot run simultaneously. // `run()` cannot run simultaneously.
@ -457,190 +460,165 @@ impl EventLoop {
// Shadow the name because we don't use (or drop) it otherwise. // Shadow the name because we don't use (or drop) it otherwise.
let run_context = &mut *run_context; let run_context = &mut *run_context;
loop { // Keep track of the set of streams that should be removed due to some error occurring.
// Process the pending commands. //
for command in run_context.commands.try_iter() { // Checked at the start of each loop.
match command { let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![];
Command::NewStream(stream_inner) => {
let event = stream_inner.event; 'stream_loop: loop {
run_context.streams.push(stream_inner); // Remove any failed streams.
run_context.handles.push(event); for (stream_id, err) in streams_to_remove.drain(..) {
}, match run_context.streams.iter().position(|s| s.id == stream_id) {
Command::DestroyStream(stream_id) => { None => continue,
match run_context.streams.iter().position(|v| v.id == stream_id) { Some(p) => {
None => continue, run_context.handles.remove(p + 1);
Some(p) => { run_context.streams.remove(p);
run_context.handles.remove(p + 1); let event = StreamEvent::Close(err.into());
run_context.streams.remove(p); callback(stream_id, event);
},
}
},
Command::PlayStream(stream_id) => {
match run_context.streams.iter_mut().find(|v| v.id == stream_id) {
None => continue,
Some(stream) => {
if !stream.playing {
let hresult = (*stream.audio_client).Start();
check_result(hresult).unwrap();
stream.playing = true;
}
}
}
},
Command::PauseStream(stream_id) => {
match run_context.streams.iter_mut().find(|v| v.id == stream_id) {
None => continue,
Some(stream) => {
if stream.playing {
let hresult = (*stream.audio_client).Stop();
check_result(hresult).unwrap();
stream.playing = false;
}
},
}
}, },
} }
} }
// Wait for any of the handles to be signalled, which means that the corresponding // Process queued commands.
// sound needs a buffer. process_commands(run_context, callback);
debug_assert!(run_context.handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize);
let result = synchapi::WaitForMultipleObjectsEx(run_context.handles.len() as u32,
run_context.handles.as_ptr(),
FALSE,
winbase::INFINITE, /* TODO: allow setting a timeout */
FALSE /* irrelevant parameter here */);
// Notifying the corresponding task handler. // Wait for any of the handles to be signalled.
debug_assert!(result >= winbase::WAIT_OBJECT_0); let handle_idx = match wait_for_handle_signal(&run_context.handles) {
let handle_id = (result - winbase::WAIT_OBJECT_0) as usize; Ok(idx) => idx,
Err(err) => {
for stream in &run_context.streams {
let event = StreamEvent::Close(err.clone().into());
callback(stream.id.clone(), event);
}
run_context.streams.clear();
run_context.handles.truncate(1);
break 'stream_loop;
}
};
// If `handle_id` is 0, then it's `pending_scheduled_event` that was signalled in // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in
// order for us to pick up the pending commands. // order for us to pick up the pending commands. Otherwise, a stream needs data.
// Otherwise, a stream needs data. if handle_idx == 0 {
if handle_id >= 1 { continue;
let stream = &mut run_context.streams[handle_id - 1]; }
let stream_id = stream.id.clone();
// Obtaining the number of frames that are available to be written. let stream_idx = handle_idx - 1;
let mut frames_available = { let stream = &mut run_context.streams[stream_idx];
let mut padding = mem::uninitialized();
let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding);
// Happens when a bluetooth headset was turned off, for example.
if hresult == AUDCLNT_E_DEVICE_INVALIDATED {
// The client code should switch to a different device eventually.
// For now let's just skip the invalidated device.
// Would be nice to inform the client code about the invalidation,
// but throwing a panic isn't the most ergonomic way to do so.
continue}
check_result(hresult).unwrap();
stream.max_frames_in_buffer - padding
};
if frames_available == 0 { // The number of frames available for reading/writing.
// TODO: can this happen? let mut frames_available = match get_available_frames(stream) {
Ok(0) => continue, // TODO: Can this happen?
Ok(n) => n,
Err(err) => {
streams_to_remove.push((stream.id.clone(), err));
continue; continue;
} }
};
let sample_size = stream.sample_format.sample_size(); let sample_size = stream.sample_format.sample_size();
// Obtaining a pointer to the buffer. // Obtaining a pointer to the buffer.
match stream.client_flow { match stream.client_flow {
AudioClientFlow::Capture { capture_client } => { AudioClientFlow::Capture { capture_client } => {
// Get the available data in the shared buffer. // Get the available data in the shared buffer.
let mut buffer: *mut BYTE = mem::uninitialized(); let mut buffer: *mut BYTE = mem::uninitialized();
let mut flags = mem::uninitialized(); let mut flags = mem::uninitialized();
let hresult = (*capture_client).GetBuffer( let hresult = (*capture_client).GetBuffer(
&mut buffer, &mut buffer,
&mut frames_available, &mut frames_available,
&mut flags, &mut flags,
ptr::null_mut(), ptr::null_mut(),
ptr::null_mut(), ptr::null_mut(),
); );
check_result(hresult).unwrap();
if hresult == AUDCLNT_S_BUFFER_EMPTY { continue; } // TODO: Can this happen?
if hresult == AUDCLNT_S_BUFFER_EMPTY {
continue;
} else if let Err(err) = stream_error_from_hresult(hresult) {
streams_to_remove.push((stream.id.clone(), err));
continue;
}
debug_assert!(!buffer.is_null()); debug_assert!(!buffer.is_null());
let buffer_len = frames_available as usize
* stream.bytes_per_frame as usize / sample_size;
// Simplify the capture callback sample format branches. let buffer_len = frames_available as usize
macro_rules! capture_callback { * stream.bytes_per_frame as usize / sample_size;
($T:ty, $Variant:ident) => {{
let buffer_data = buffer as *mut _ as *const $T;
let slice = slice::from_raw_parts(buffer_data, buffer_len);
let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer {
buffer: slice,
});
let data = StreamData::Input { buffer: unknown_buffer };
callback(stream_id, data);
// Release the buffer. // Simplify the capture callback sample format branches.
let hresult = (*capture_client).ReleaseBuffer(frames_available); macro_rules! capture_callback {
match check_result(hresult) { ($T:ty, $Variant:ident) => {{
// Ignoring unavailable device error. let buffer_data = buffer as *mut _ as *const $T;
Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => { let slice = slice::from_raw_parts(buffer_data, buffer_len);
}, let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer {
e => e.unwrap(), buffer: slice,
}; });
}}; let data = StreamData::Input { buffer: unknown_buffer };
} let event = StreamEvent::Data(data);
callback(stream.id.clone(), event);
// Release the buffer.
let hresult = (*capture_client).ReleaseBuffer(frames_available);
if let Err(err) = stream_error_from_hresult(hresult) {
streams_to_remove.push((stream.id.clone(), err));
continue;
}
}};
}
match stream.sample_format { match stream.sample_format {
SampleFormat::F32 => capture_callback!(f32, F32), SampleFormat::F32 => capture_callback!(f32, F32),
SampleFormat::I16 => capture_callback!(i16, I16), SampleFormat::I16 => capture_callback!(i16, I16),
SampleFormat::U16 => capture_callback!(u16, U16), SampleFormat::U16 => capture_callback!(u16, U16),
} }
}, },
AudioClientFlow::Render { render_client } => { AudioClientFlow::Render { render_client } => {
let mut buffer: *mut BYTE = mem::uninitialized(); let mut buffer: *mut BYTE = mem::uninitialized();
let hresult = (*render_client).GetBuffer( let hresult = (*render_client).GetBuffer(
frames_available, frames_available,
&mut buffer as *mut *mut _, &mut buffer as *mut *mut _,
); );
// FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED`
check_result(hresult).unwrap();
debug_assert!(!buffer.is_null());
let buffer_len = frames_available as usize
* stream.bytes_per_frame as usize / sample_size;
// Simplify the render callback sample format branches. if let Err(err) = stream_error_from_hresult(hresult) {
macro_rules! render_callback { streams_to_remove.push((stream.id.clone(), err));
($T:ty, $Variant:ident) => {{ continue;
let buffer_data = buffer as *mut $T; }
let slice = slice::from_raw_parts_mut(buffer_data, buffer_len);
let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer {
buffer: slice
});
let data = StreamData::Output { buffer: unknown_buffer };
callback(stream_id, data);
let hresult = match stream.client_flow {
AudioClientFlow::Render { render_client } => {
(*render_client).ReleaseBuffer(frames_available as u32, 0)
},
_ => unreachable!(),
};
match check_result(hresult) {
// Ignoring the error that is produced if the device has been disconnected.
Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => (),
e => e.unwrap(),
};
}}
}
match stream.sample_format { debug_assert!(!buffer.is_null());
SampleFormat::F32 => render_callback!(f32, F32), let buffer_len = frames_available as usize
SampleFormat::I16 => render_callback!(i16, I16), * stream.bytes_per_frame as usize / sample_size;
SampleFormat::U16 => render_callback!(u16, U16),
} // Simplify the render callback sample format branches.
}, macro_rules! render_callback {
} ($T:ty, $Variant:ident) => {{
let buffer_data = buffer as *mut $T;
let slice = slice::from_raw_parts_mut(buffer_data, buffer_len);
let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer {
buffer: slice
});
let data = StreamData::Output { buffer: unknown_buffer };
let event = StreamEvent::Data(data);
callback(stream.id.clone(), event);
let hresult = (*render_client)
.ReleaseBuffer(frames_available as u32, 0);
if let Err(err) = stream_error_from_hresult(hresult) {
streams_to_remove.push((stream.id.clone(), err));
continue;
}
}}
}
match stream.sample_format {
SampleFormat::F32 => render_callback!(f32, F32),
SampleFormat::I16 => render_callback!(i16, I16),
SampleFormat::U16 => render_callback!(u16, U16),
}
},
} }
} }
} }
panic!("`cpal::EventLoop::run` API currently disallows returning");
} }
#[inline] #[inline]
@ -758,3 +736,137 @@ fn format_to_waveformatextensible(format: &Format) -> Option<mmreg::WAVEFORMATEX
Some(waveformatextensible) Some(waveformatextensible)
} }
// Process any pending commands that are queued within the `RunContext`.
fn process_commands(
run_context: &mut RunContext,
callback: &mut dyn FnMut(StreamId, StreamEvent),
) {
// Process the pending commands.
for command in run_context.commands.try_iter() {
match command {
Command::NewStream(stream_inner) => {
let event = stream_inner.event;
run_context.streams.push(stream_inner);
run_context.handles.push(event);
},
Command::DestroyStream(stream_id) => {
match run_context.streams.iter().position(|s| s.id == stream_id) {
None => continue,
Some(p) => {
run_context.handles.remove(p + 1);
run_context.streams.remove(p);
},
}
let event = StreamEvent::Close(StreamCloseCause::UserDestroyed);
callback(stream_id, event);
},
Command::PlayStream(stream_id) => {
match run_context.streams.iter().position(|s| s.id == stream_id) {
None => continue,
Some(p) => {
if !run_context.streams[p].playing {
let hresult = unsafe {
(*run_context.streams[p].audio_client).Start()
};
match stream_error_from_hresult(hresult) {
Ok(()) => {
run_context.streams[p].playing = true;
let event = StreamEvent::Play;
callback(stream_id, event);
}
Err(err) => {
let event = StreamEvent::Close(err.into());
callback(stream_id, event);
run_context.handles.remove(p + 1);
run_context.streams.remove(p);
}
}
}
}
}
},
Command::PauseStream(stream_id) => {
match run_context.streams.iter().position(|s| s.id == stream_id) {
None => continue,
Some(p) => {
if run_context.streams[p].playing {
let hresult = unsafe {
(*run_context.streams[p].audio_client).Stop()
};
match stream_error_from_hresult(hresult) {
Ok(()) => {
run_context.streams[p].playing = false;
let event = StreamEvent::Pause;
callback(stream_id, event);
}
Err(err) => {
let event = StreamEvent::Close(err.into());
callback(stream_id, event);
run_context.handles.remove(p + 1);
run_context.streams.remove(p);
}
}
}
},
}
},
}
}
}
// Wait for any of the given handles to be signalled.
//
// Returns the index of the `handle` that was signalled, or an `Err` if
// `WaitForMultipleObjectsEx` fails.
//
// This is called when the `run` thread is ready to wait for the next event. The
// next event might be some command submitted by the user (the first handle) or
// might indicate that one of the streams is ready to deliver or receive audio.
fn wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result<usize, BackendSpecificError> {
debug_assert!(handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize);
let result = unsafe {
synchapi::WaitForMultipleObjectsEx(
handles.len() as u32,
handles.as_ptr(),
FALSE, // Don't wait for all, just wait for the first
winbase::INFINITE, // TODO: allow setting a timeout
FALSE, // irrelevant parameter here
)
};
if result == winbase::WAIT_FAILED {
let err = unsafe {
winapi::um::errhandlingapi::GetLastError()
};
let description = format!("`WaitForMultipleObjectsEx failed: {}", err);
let err = BackendSpecificError { description };
return Err(err);
}
// Notifying the corresponding task handler.
debug_assert!(result >= winbase::WAIT_OBJECT_0);
let handle_idx = (result - winbase::WAIT_OBJECT_0) as usize;
Ok(handle_idx)
}
// Get the number of available frames that are available for writing/reading.
fn get_available_frames(stream: &StreamInner) -> Result<u32, StreamError> {
unsafe {
let mut padding = mem::uninitialized();
let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding);
stream_error_from_hresult(hresult)?;
Ok(stream.max_frames_in_buffer - padding)
}
}
// Convert the given `HRESULT` into a `StreamError` if it does indicate an error.
fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> {
if hresult == AUDCLNT_E_DEVICE_INVALIDATED {
return Err(StreamError::DeviceNotAvailable);
}
if let Err(err) = check_result(hresult) {
let description = format!("{}", err);
let err = BackendSpecificError { description };
return Err(err.into());
}
Ok(())
}