From b5bfb8d4223a261fac342d9fc2677b9b07d68c02 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Thu, 16 Jan 2020 01:17:01 +1100 Subject: [PATCH] Update WASAPI backend for removal of `UnknownTypeBuffer` --- src/host/wasapi/device.rs | 52 ++--- src/host/wasapi/stream.rs | 396 ++++++++++++++++++++++---------------- 2 files changed, 257 insertions(+), 191 deletions(-) diff --git a/src/host/wasapi/device.rs b/src/host/wasapi/device.rs index 50dd239..db5c154 100644 --- a/src/host/wasapi/device.rs +++ b/src/host/wasapi/device.rs @@ -1,3 +1,18 @@ +use crate::{ + BackendSpecificError, + DefaultFormatError, + DeviceNameError, + DevicesError, + Format, + InputData, + OutputData, + Sample, + SampleFormat, + SampleRate, + SupportedFormat, + SupportedFormatsError, + COMMON_SAMPLE_RATES, +}; use std; use std::ffi::OsString; use std::fmt; @@ -9,17 +24,6 @@ use std::ptr; use std::slice; use std::sync::{Arc, Mutex, MutexGuard, atomic::Ordering}; -use BackendSpecificError; -use DefaultFormatError; -use DeviceNameError; -use DevicesError; -use Format; -use SampleFormat; -use SampleRate; -use SupportedFormat; -use SupportedFormatsError; -use COMMON_SAMPLE_RATES; - use super::check_result; use super::check_result_backend_specific; use super::com; @@ -54,7 +58,7 @@ use super::{ stream::{AudioClientFlow, Stream, StreamInner}, winapi::um::synchapi, }; -use crate::{traits::DeviceTrait, BuildStreamError, StreamData, StreamError}; +use crate::{traits::DeviceTrait, BuildStreamError, StreamError}; pub type SupportedInputFormats = std::vec::IntoIter; pub type SupportedOutputFormats = std::vec::IntoIter; @@ -102,38 +106,34 @@ impl DeviceTrait for Device { Device::default_output_format(self) } - fn build_input_stream( + fn build_input_stream( &self, format: &Format, data_callback: D, error_callback: E, ) -> Result where - D: FnMut(StreamData) + Send + 'static, + T: Sample, + D: FnMut(InputData) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - Ok(Stream::new( - self.build_input_stream_inner(format)?, - data_callback, - error_callback, - )) + let stream_inner = self.build_input_stream_inner(format)?; + Ok(Stream::new_input(stream_inner, data_callback, error_callback)) } - fn build_output_stream( + fn build_output_stream( &self, format: &Format, data_callback: D, error_callback: E, ) -> Result where - D: FnMut(StreamData) + Send + 'static, + T: Sample, + D: FnMut(OutputData) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - Ok(Stream::new( - self.build_output_stream_inner(format)?, - data_callback, - error_callback, - )) + let stream_inner = self.build_output_stream_inner(format)?; + Ok(Stream::new_output(stream_inner, data_callback, error_callback)) } } diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index e020846..1c49a13 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -1,3 +1,19 @@ +use crate::{ + BackendSpecificError, + InputData, + OutputData, + PauseStreamError, + PlayStreamError, + Sample, + SampleFormat, + StreamError, +}; +use crate::traits::StreamTrait; +use std::mem; +use std::ptr; +use std::slice; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::thread::{self, JoinHandle}; use super::check_result; use super::winapi::shared::basetsd::UINT32; use super::winapi::shared::minwindef::{BYTE, FALSE, WORD}; @@ -7,23 +23,6 @@ use super::winapi::um::synchapi; use super::winapi::um::winbase; use super::winapi::um::winnt; -use std::mem; -use std::ptr; -use std::slice; -use std::sync::mpsc::{channel, Receiver, Sender}; - -use crate::traits::StreamTrait; -use std::thread::{self, JoinHandle}; - -use BackendSpecificError; -use PauseStreamError; -use PlayStreamError; -use SampleFormat; -use StreamData; -use StreamError; -use UnknownTypeInputBuffer; -use UnknownTypeOutputBuffer; - pub struct Stream { /// The high-priority audio processing thread calling callbacks. /// Option used for moving out in destructor. @@ -86,13 +85,14 @@ pub struct StreamInner { } impl Stream { - pub(crate) fn new( + pub(crate) fn new_input( stream_inner: StreamInner, mut data_callback: D, mut error_callback: E, ) -> Stream where - D: FnMut(StreamData) + Send + 'static, + T: Sample, + D: FnMut(InputData) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { let pending_scheduled_event = @@ -106,7 +106,37 @@ impl Stream { }; let thread = - thread::spawn(move || run_inner(run_context, &mut data_callback, &mut error_callback)); + thread::spawn(move || run_input(run_context, &mut data_callback, &mut error_callback)); + + Stream { + thread: Some(thread), + commands: tx, + pending_scheduled_event, + } + } + + pub(crate) fn new_output( + stream_inner: StreamInner, + mut data_callback: D, + mut error_callback: E, + ) -> Stream + where + T: Sample, + D: FnMut(OutputData) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let pending_scheduled_event = + unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) }; + let (tx, rx) = channel(); + + let run_context = RunContext { + handles: vec![pending_scheduled_event, stream_inner.event], + stream: stream_inner, + commands: rx, + }; + + let thread = + thread::spawn(move || run_output(run_context, &mut data_callback, &mut error_callback)); Stream { thread: Some(thread), @@ -255,160 +285,196 @@ fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> Ok(()) } -fn run_inner( - mut run_context: RunContext, - data_callback: &mut dyn FnMut(StreamData), +fn run_input( + mut run_ctxt: RunContext, + data_callback: &mut dyn FnMut(InputData), error_callback: &mut dyn FnMut(StreamError), -) { +) where + T: Sample, +{ + loop { + match process_commands_and_await_signal(&mut run_ctxt, error_callback) { + Some(ControlFlow::Break) => break, + Some(ControlFlow::Continue) => continue, + None => (), + } + let capture_client = match run_ctxt.stream.client_flow { + AudioClientFlow::Capture { capture_client } => capture_client, + _ => unreachable!(), + }; + match process_input(&mut run_ctxt.stream, capture_client, data_callback, error_callback) { + ControlFlow::Break => break, + ControlFlow::Continue => continue, + } + } +} + +fn run_output( + mut run_ctxt: RunContext, + data_callback: &mut dyn FnMut(OutputData), + error_callback: &mut dyn FnMut(StreamError), +) where + T: Sample, +{ + loop { + match process_commands_and_await_signal(&mut run_ctxt, error_callback) { + Some(ControlFlow::Break) => break, + Some(ControlFlow::Continue) => continue, + None => (), + } + let render_client = match run_ctxt.stream.client_flow { + AudioClientFlow::Render { render_client } => render_client, + _ => unreachable!(), + }; + match process_output(&mut run_ctxt.stream, render_client, data_callback, error_callback) { + ControlFlow::Break => break, + ControlFlow::Continue => continue, + } + } +} + +enum ControlFlow { + Break, + Continue, +} + +fn process_commands_and_await_signal( + run_context: &mut RunContext, + error_callback: &mut dyn FnMut(StreamError), +) -> Option { + // Process queued commands. + match process_commands(run_context) { + Ok(true) => (), + Ok(false) => return Some(ControlFlow::Break), + Err(err) => { + error_callback(err); + return Some(ControlFlow::Break); + } + }; + + // Wait for any of the handles to be signalled. + let handle_idx = match wait_for_handle_signal(&run_context.handles) { + Ok(idx) => idx, + Err(err) => { + error_callback(err.into()); + return Some(ControlFlow::Break); + } + }; + + // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in + // order for us to pick up the pending commands. Otherwise, a stream needs data. + if handle_idx == 0 { + return Some(ControlFlow::Continue); + } + + None +} + +// The loop for processing pending input data. +fn process_input( + stream: &StreamInner, + capture_client: *mut audioclient::IAudioCaptureClient, + data_callback: &mut dyn FnMut(InputData), + error_callback: &mut dyn FnMut(StreamError), +) -> ControlFlow +where + T: Sample, +{ + let mut frames_available = 0; unsafe { - 'stream_loop: loop { - // Process queued commands. - match process_commands(&mut run_context) { - Ok(true) => (), - Ok(false) => break, - Err(err) => { - error_callback(err); - break 'stream_loop; - } - }; + // Get the available data in the shared buffer. + let mut buffer: *mut BYTE = mem::uninitialized(); + let mut flags = mem::uninitialized(); + loop { + let hresult = (*capture_client).GetNextPacketSize(&mut frames_available); + if let Err(err) = stream_error_from_hresult(hresult) { + error_callback(err); + return ControlFlow::Break; + } + if frames_available == 0 { + return ControlFlow::Continue; + } + let hresult = (*capture_client).GetBuffer( + &mut buffer, + &mut frames_available, + &mut flags, + ptr::null_mut(), + ptr::null_mut(), + ); - // Wait for any of the handles to be signalled. - let handle_idx = match wait_for_handle_signal(&run_context.handles) { - Ok(idx) => idx, - Err(err) => { - error_callback(err.into()); - break 'stream_loop; - } - }; - - // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in - // order for us to pick up the pending commands. Otherwise, a stream needs data. - if handle_idx == 0 { + // TODO: Can this happen? + if hresult == AUDCLNT_S_BUFFER_EMPTY { continue; + } else if let Err(err) = stream_error_from_hresult(hresult) { + error_callback(err); + return ControlFlow::Break; } - let stream = &mut run_context.stream; - let sample_size = stream.sample_format.sample_size(); + debug_assert!(!buffer.is_null()); - // Obtaining a pointer to the buffer. - match stream.client_flow { - AudioClientFlow::Capture { capture_client } => { - let mut frames_available = 0; - // Get the available data in the shared buffer. - let mut buffer: *mut BYTE = mem::uninitialized(); - let mut flags = mem::uninitialized(); - loop { - let hresult = (*capture_client).GetNextPacketSize(&mut frames_available); - if let Err(err) = stream_error_from_hresult(hresult) { - error_callback(err); - break 'stream_loop; - } - if frames_available == 0 { - break; - } - let hresult = (*capture_client).GetBuffer( - &mut buffer, - &mut frames_available, - &mut flags, - ptr::null_mut(), - ptr::null_mut(), - ); + let buffer_len = frames_available as usize + * stream.bytes_per_frame as usize + / mem::size_of::(); - // TODO: Can this happen? - if hresult == AUDCLNT_S_BUFFER_EMPTY { - continue; - } else if let Err(err) = stream_error_from_hresult(hresult) { - error_callback(err); - break 'stream_loop; - } - - debug_assert!(!buffer.is_null()); - - let buffer_len = frames_available as usize - * stream.bytes_per_frame as usize - / sample_size; - - // Simplify the capture callback sample format branches. - macro_rules! capture_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut _ as *const $T; - let slice = slice::from_raw_parts(buffer_data, buffer_len); - let unknown_buffer = - UnknownTypeInputBuffer::$Variant(::InputBuffer { - buffer: slice, - }); - let data = StreamData::Input { - buffer: unknown_buffer, - }; - data_callback(data); - // Release the buffer. - let hresult = (*capture_client).ReleaseBuffer(frames_available); - if let Err(err) = stream_error_from_hresult(hresult) { - error_callback(err); - break 'stream_loop; - } - }}; - } - - match stream.sample_format { - SampleFormat::F32 => capture_callback!(f32, F32), - SampleFormat::I16 => capture_callback!(i16, I16), - SampleFormat::U16 => capture_callback!(u16, U16), - } - } - } - - AudioClientFlow::Render { render_client } => { - // The number of frames available for writing. - let frames_available = match get_available_frames(&stream) { - Ok(0) => continue, // TODO: Can this happen? - Ok(n) => n, - Err(err) => { - error_callback(err); - break 'stream_loop; - } - }; - - let mut buffer: *mut BYTE = mem::uninitialized(); - let hresult = - (*render_client).GetBuffer(frames_available, &mut buffer as *mut *mut _); - - if let Err(err) = stream_error_from_hresult(hresult) { - error_callback(err); - break 'stream_loop; - } - - debug_assert!(!buffer.is_null()); - let buffer_len = - frames_available as usize * stream.bytes_per_frame as usize / sample_size; - - // Simplify the render callback sample format branches. - macro_rules! render_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut $T; - let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); - let unknown_buffer = - UnknownTypeOutputBuffer::$Variant(::OutputBuffer { buffer: slice }); - let data = StreamData::Output { - buffer: unknown_buffer, - }; - data_callback(data); - let hresult = - (*render_client).ReleaseBuffer(frames_available as u32, 0); - if let Err(err) = stream_error_from_hresult(hresult) { - error_callback(err); - break 'stream_loop; - } - }}; - } - - match stream.sample_format { - SampleFormat::F32 => render_callback!(f32, F32), - SampleFormat::I16 => render_callback!(i16, I16), - SampleFormat::U16 => render_callback!(u16, U16), - } - } + // Simplify the capture callback sample format branches. + let buffer_data = buffer as *mut _ as *const T; + let slice = slice::from_raw_parts(buffer_data, buffer_len); + let input_data = InputData { buffer: slice }; + data_callback(input_data); + // Release the buffer. + let hresult = (*capture_client).ReleaseBuffer(frames_available); + if let Err(err) = stream_error_from_hresult(hresult) { + error_callback(err); + return ControlFlow::Break; } } } } + +// The loop for writing output data. +fn process_output( + stream: &StreamInner, + render_client: *mut audioclient::IAudioRenderClient, + data_callback: &mut dyn FnMut(OutputData), + error_callback: &mut dyn FnMut(StreamError), +) -> ControlFlow +where + T: Sample, +{ + // The number of frames available for writing. + let frames_available = match get_available_frames(&stream) { + Ok(0) => return ControlFlow::Continue, // TODO: Can this happen? + Ok(n) => n, + Err(err) => { + error_callback(err); + return ControlFlow::Break; + } + }; + + unsafe { + let mut buffer: *mut BYTE = mem::uninitialized(); + let hresult = + (*render_client).GetBuffer(frames_available, &mut buffer as *mut *mut _); + + if let Err(err) = stream_error_from_hresult(hresult) { + error_callback(err); + return ControlFlow::Break; + } + + debug_assert!(!buffer.is_null()); + let buffer_len = + frames_available as usize * stream.bytes_per_frame as usize / mem::size_of::(); + + let buffer_data = buffer as *mut T; + let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); + let output_data = OutputData { buffer: slice }; + data_callback(output_data); + let hresult = (*render_client).ReleaseBuffer(frames_available as u32, 0); + if let Err(err) = stream_error_from_hresult(hresult) { + error_callback(err); + return ControlFlow::Break; + } + } + + ControlFlow::Continue +}