diff --git a/src/wasapi/stream.rs b/src/wasapi/stream.rs index b8070a4..9539ac9 100644 --- a/src/wasapi/stream.rs +++ b/src/wasapi/stream.rs @@ -26,7 +26,10 @@ use Format; use PauseStreamError; use PlayStreamError; use SampleFormat; +use StreamCloseCause; use StreamData; +use StreamError; +use StreamEvent; use UnknownTypeOutputBuffer; use UnknownTypeInputBuffer; @@ -443,12 +446,12 @@ impl EventLoop { #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamData) + where F: FnMut(StreamId, StreamEvent) { self.run_inner(&mut callback); } - fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! { + fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { unsafe { // We keep `run_context` locked forever, which guarantees that two invocations of // `run()` cannot run simultaneously. @@ -457,190 +460,165 @@ impl EventLoop { // Shadow the name because we don't use (or drop) it otherwise. let run_context = &mut *run_context; - loop { - // Process the pending commands. - for command in run_context.commands.try_iter() { - match command { - Command::NewStream(stream_inner) => { - let event = stream_inner.event; - run_context.streams.push(stream_inner); - run_context.handles.push(event); - }, - Command::DestroyStream(stream_id) => { - match run_context.streams.iter().position(|v| v.id == stream_id) { - None => continue, - Some(p) => { - run_context.handles.remove(p + 1); - run_context.streams.remove(p); - }, - } - }, - Command::PlayStream(stream_id) => { - match run_context.streams.iter_mut().find(|v| v.id == stream_id) { - None => continue, - Some(stream) => { - if !stream.playing { - let hresult = (*stream.audio_client).Start(); - check_result(hresult).unwrap(); - stream.playing = true; - } - } - } - }, - Command::PauseStream(stream_id) => { - match run_context.streams.iter_mut().find(|v| v.id == stream_id) { - None => continue, - Some(stream) => { - if stream.playing { - let hresult = (*stream.audio_client).Stop(); - check_result(hresult).unwrap(); - stream.playing = false; - } - }, - } + // Keep track of the set of streams that should be removed due to some error occurring. + // + // Checked at the start of each loop. + let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; + + 'stream_loop: loop { + // Remove any failed streams. + for (stream_id, err) in streams_to_remove.drain(..) { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); }, } } - // Wait for any of the handles to be signalled, which means that the corresponding - // sound needs a buffer. - debug_assert!(run_context.handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize); - let result = synchapi::WaitForMultipleObjectsEx(run_context.handles.len() as u32, - run_context.handles.as_ptr(), - FALSE, - winbase::INFINITE, /* TODO: allow setting a timeout */ - FALSE /* irrelevant parameter here */); + // Process queued commands. + process_commands(run_context, callback); - // Notifying the corresponding task handler. - debug_assert!(result >= winbase::WAIT_OBJECT_0); - let handle_id = (result - winbase::WAIT_OBJECT_0) as usize; + // Wait for any of the handles to be signalled. + let handle_idx = match wait_for_handle_signal(&run_context.handles) { + Ok(idx) => idx, + Err(err) => { + for stream in &run_context.streams { + let event = StreamEvent::Close(err.clone().into()); + callback(stream.id.clone(), event); + } + run_context.streams.clear(); + run_context.handles.truncate(1); + break 'stream_loop; + } + }; - // If `handle_id` is 0, then it's `pending_scheduled_event` that was signalled in - // order for us to pick up the pending commands. - // Otherwise, a stream needs data. - if handle_id >= 1 { - let stream = &mut run_context.streams[handle_id - 1]; - let stream_id = stream.id.clone(); + // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in + // order for us to pick up the pending commands. Otherwise, a stream needs data. + if handle_idx == 0 { + continue; + } - // Obtaining the number of frames that are available to be written. - let mut frames_available = { - let mut padding = mem::uninitialized(); - let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding); - // Happens when a bluetooth headset was turned off, for example. - if hresult == AUDCLNT_E_DEVICE_INVALIDATED { - // The client code should switch to a different device eventually. - // For now let's just skip the invalidated device. - // Would be nice to inform the client code about the invalidation, - // but throwing a panic isn't the most ergonomic way to do so. - continue} - check_result(hresult).unwrap(); - stream.max_frames_in_buffer - padding - }; + let stream_idx = handle_idx - 1; + let stream = &mut run_context.streams[stream_idx]; - if frames_available == 0 { - // TODO: can this happen? + // The number of frames available for reading/writing. + let mut frames_available = match get_available_frames(stream) { + Ok(0) => continue, // TODO: Can this happen? + Ok(n) => n, + Err(err) => { + streams_to_remove.push((stream.id.clone(), err)); continue; } + }; - let sample_size = stream.sample_format.sample_size(); + let sample_size = stream.sample_format.sample_size(); - // Obtaining a pointer to the buffer. - match stream.client_flow { + // Obtaining a pointer to the buffer. + match stream.client_flow { - AudioClientFlow::Capture { capture_client } => { - // Get the available data in the shared buffer. - let mut buffer: *mut BYTE = mem::uninitialized(); - let mut flags = mem::uninitialized(); - let hresult = (*capture_client).GetBuffer( - &mut buffer, - &mut frames_available, - &mut flags, - ptr::null_mut(), - ptr::null_mut(), - ); - check_result(hresult).unwrap(); + AudioClientFlow::Capture { capture_client } => { + // Get the available data in the shared buffer. + let mut buffer: *mut BYTE = mem::uninitialized(); + let mut flags = mem::uninitialized(); + let hresult = (*capture_client).GetBuffer( + &mut buffer, + &mut frames_available, + &mut flags, + ptr::null_mut(), + ptr::null_mut(), + ); - if hresult == AUDCLNT_S_BUFFER_EMPTY { continue; } + // TODO: Can this happen? + if hresult == AUDCLNT_S_BUFFER_EMPTY { + continue; + } else if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } - debug_assert!(!buffer.is_null()); - let buffer_len = frames_available as usize - * stream.bytes_per_frame as usize / sample_size; + debug_assert!(!buffer.is_null()); - // Simplify the capture callback sample format branches. - macro_rules! capture_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut _ as *const $T; - let slice = slice::from_raw_parts(buffer_data, buffer_len); - let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { - buffer: slice, - }); - let data = StreamData::Input { buffer: unknown_buffer }; - callback(stream_id, data); + let buffer_len = frames_available as usize + * stream.bytes_per_frame as usize / sample_size; - // Release the buffer. - let hresult = (*capture_client).ReleaseBuffer(frames_available); - match check_result(hresult) { - // Ignoring unavailable device error. - Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => { - }, - e => e.unwrap(), - }; - }}; - } + // Simplify the capture callback sample format branches. + macro_rules! capture_callback { + ($T:ty, $Variant:ident) => {{ + let buffer_data = buffer as *mut _ as *const $T; + let slice = slice::from_raw_parts(buffer_data, buffer_len); + let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { + buffer: slice, + }); + let data = StreamData::Input { buffer: unknown_buffer }; + let event = StreamEvent::Data(data); + callback(stream.id.clone(), event); + // Release the buffer. + let hresult = (*capture_client).ReleaseBuffer(frames_available); + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + }}; + } - match stream.sample_format { - SampleFormat::F32 => capture_callback!(f32, F32), - SampleFormat::I16 => capture_callback!(i16, I16), - SampleFormat::U16 => capture_callback!(u16, U16), - } - }, + match stream.sample_format { + SampleFormat::F32 => capture_callback!(f32, F32), + SampleFormat::I16 => capture_callback!(i16, I16), + SampleFormat::U16 => capture_callback!(u16, U16), + } + }, - AudioClientFlow::Render { render_client } => { - let mut buffer: *mut BYTE = mem::uninitialized(); - let hresult = (*render_client).GetBuffer( - frames_available, - &mut buffer as *mut *mut _, - ); - // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` - check_result(hresult).unwrap(); - debug_assert!(!buffer.is_null()); - let buffer_len = frames_available as usize - * stream.bytes_per_frame as usize / sample_size; + AudioClientFlow::Render { render_client } => { + let mut buffer: *mut BYTE = mem::uninitialized(); + let hresult = (*render_client).GetBuffer( + frames_available, + &mut buffer as *mut *mut _, + ); - // Simplify the render callback sample format branches. - macro_rules! render_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut $T; - let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); - let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer { - buffer: slice - }); - let data = StreamData::Output { buffer: unknown_buffer }; - callback(stream_id, data); - let hresult = match stream.client_flow { - AudioClientFlow::Render { render_client } => { - (*render_client).ReleaseBuffer(frames_available as u32, 0) - }, - _ => unreachable!(), - }; - match check_result(hresult) { - // Ignoring the error that is produced if the device has been disconnected. - Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => (), - e => e.unwrap(), - }; - }} - } + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } - match stream.sample_format { - SampleFormat::F32 => render_callback!(f32, F32), - SampleFormat::I16 => render_callback!(i16, I16), - SampleFormat::U16 => render_callback!(u16, U16), - } - }, - } + debug_assert!(!buffer.is_null()); + let buffer_len = frames_available as usize + * stream.bytes_per_frame as usize / sample_size; + + // Simplify the render callback sample format branches. + macro_rules! render_callback { + ($T:ty, $Variant:ident) => {{ + let buffer_data = buffer as *mut $T; + let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); + let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer { + buffer: slice + }); + let data = StreamData::Output { buffer: unknown_buffer }; + let event = StreamEvent::Data(data); + callback(stream.id.clone(), event); + let hresult = (*render_client) + .ReleaseBuffer(frames_available as u32, 0); + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + }} + } + + match stream.sample_format { + SampleFormat::F32 => render_callback!(f32, F32), + SampleFormat::I16 => render_callback!(i16, I16), + SampleFormat::U16 => render_callback!(u16, U16), + } + }, } } } + + panic!("`cpal::EventLoop::run` API currently disallows returning"); } #[inline] @@ -758,3 +736,137 @@ fn format_to_waveformatextensible(format: &Format) -> Option { + let event = stream_inner.event; + run_context.streams.push(stream_inner); + run_context.handles.push(event); + }, + Command::DestroyStream(stream_id) => { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + }, + } + let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); + callback(stream_id, event); + }, + Command::PlayStream(stream_id) => { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + if !run_context.streams[p].playing { + let hresult = unsafe { + (*run_context.streams[p].audio_client).Start() + }; + match stream_error_from_hresult(hresult) { + Ok(()) => { + run_context.streams[p].playing = true; + let event = StreamEvent::Play; + callback(stream_id, event); + } + Err(err) => { + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + } + } + } + } + } + }, + Command::PauseStream(stream_id) => { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + if run_context.streams[p].playing { + let hresult = unsafe { + (*run_context.streams[p].audio_client).Stop() + }; + match stream_error_from_hresult(hresult) { + Ok(()) => { + run_context.streams[p].playing = false; + let event = StreamEvent::Pause; + callback(stream_id, event); + } + Err(err) => { + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + } + } + } + }, + } + }, + } + } +} + +// Wait for any of the given handles to be signalled. +// +// Returns the index of the `handle` that was signalled, or an `Err` if +// `WaitForMultipleObjectsEx` fails. +// +// This is called when the `run` thread is ready to wait for the next event. The +// next event might be some command submitted by the user (the first handle) or +// might indicate that one of the streams is ready to deliver or receive audio. +fn wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result { + debug_assert!(handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize); + let result = unsafe { + synchapi::WaitForMultipleObjectsEx( + handles.len() as u32, + handles.as_ptr(), + FALSE, // Don't wait for all, just wait for the first + winbase::INFINITE, // TODO: allow setting a timeout + FALSE, // irrelevant parameter here + ) + }; + if result == winbase::WAIT_FAILED { + let err = unsafe { + winapi::um::errhandlingapi::GetLastError() + }; + let description = format!("`WaitForMultipleObjectsEx failed: {}", err); + let err = BackendSpecificError { description }; + return Err(err); + } + // Notifying the corresponding task handler. + debug_assert!(result >= winbase::WAIT_OBJECT_0); + let handle_idx = (result - winbase::WAIT_OBJECT_0) as usize; + Ok(handle_idx) +} + +// Get the number of available frames that are available for writing/reading. +fn get_available_frames(stream: &StreamInner) -> Result { + unsafe { + let mut padding = mem::uninitialized(); + let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding); + stream_error_from_hresult(hresult)?; + Ok(stream.max_frames_in_buffer - padding) + } +} + +// Convert the given `HRESULT` into a `StreamError` if it does indicate an error. +fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> { + if hresult == AUDCLNT_E_DEVICE_INVALIDATED { + return Err(StreamError::DeviceNotAvailable); + } + if let Err(err) = check_result(hresult) { + let description = format!("{}", err); + let err = BackendSpecificError { description }; + return Err(err.into()); + } + Ok(()) +}