Update WASAPI backend for removal of `UnknownTypeBuffer`

This commit is contained in:
mitchmindtree 2020-01-16 01:17:01 +11:00
parent 5a619877f9
commit b5bfb8d422
2 changed files with 257 additions and 191 deletions

View File

@ -1,3 +1,18 @@
use crate::{
BackendSpecificError,
DefaultFormatError,
DeviceNameError,
DevicesError,
Format,
InputData,
OutputData,
Sample,
SampleFormat,
SampleRate,
SupportedFormat,
SupportedFormatsError,
COMMON_SAMPLE_RATES,
};
use std;
use std::ffi::OsString;
use std::fmt;
@ -9,17 +24,6 @@ use std::ptr;
use std::slice;
use std::sync::{Arc, Mutex, MutexGuard, atomic::Ordering};
use BackendSpecificError;
use DefaultFormatError;
use DeviceNameError;
use DevicesError;
use Format;
use SampleFormat;
use SampleRate;
use SupportedFormat;
use SupportedFormatsError;
use COMMON_SAMPLE_RATES;
use super::check_result;
use super::check_result_backend_specific;
use super::com;
@ -54,7 +58,7 @@ use super::{
stream::{AudioClientFlow, Stream, StreamInner},
winapi::um::synchapi,
};
use crate::{traits::DeviceTrait, BuildStreamError, StreamData, StreamError};
use crate::{traits::DeviceTrait, BuildStreamError, StreamError};
pub type SupportedInputFormats = std::vec::IntoIter<SupportedFormat>;
pub type SupportedOutputFormats = std::vec::IntoIter<SupportedFormat>;
@ -102,38 +106,34 @@ impl DeviceTrait for Device {
Device::default_output_format(self)
}
fn build_input_stream<D, E>(
fn build_input_stream<T, D, E>(
&self,
format: &Format,
data_callback: D,
error_callback: E,
) -> Result<Self::Stream, BuildStreamError>
where
D: FnMut(StreamData) + Send + 'static,
T: Sample,
D: FnMut(InputData<T>) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
Ok(Stream::new(
self.build_input_stream_inner(format)?,
data_callback,
error_callback,
))
let stream_inner = self.build_input_stream_inner(format)?;
Ok(Stream::new_input(stream_inner, data_callback, error_callback))
}
fn build_output_stream<D, E>(
fn build_output_stream<T, D, E>(
&self,
format: &Format,
data_callback: D,
error_callback: E,
) -> Result<Self::Stream, BuildStreamError>
where
D: FnMut(StreamData) + Send + 'static,
T: Sample,
D: FnMut(OutputData<T>) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
Ok(Stream::new(
self.build_output_stream_inner(format)?,
data_callback,
error_callback,
))
let stream_inner = self.build_output_stream_inner(format)?;
Ok(Stream::new_output(stream_inner, data_callback, error_callback))
}
}

View File

@ -1,3 +1,19 @@
use crate::{
BackendSpecificError,
InputData,
OutputData,
PauseStreamError,
PlayStreamError,
Sample,
SampleFormat,
StreamError,
};
use crate::traits::StreamTrait;
use std::mem;
use std::ptr;
use std::slice;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};
use super::check_result;
use super::winapi::shared::basetsd::UINT32;
use super::winapi::shared::minwindef::{BYTE, FALSE, WORD};
@ -7,23 +23,6 @@ use super::winapi::um::synchapi;
use super::winapi::um::winbase;
use super::winapi::um::winnt;
use std::mem;
use std::ptr;
use std::slice;
use std::sync::mpsc::{channel, Receiver, Sender};
use crate::traits::StreamTrait;
use std::thread::{self, JoinHandle};
use BackendSpecificError;
use PauseStreamError;
use PlayStreamError;
use SampleFormat;
use StreamData;
use StreamError;
use UnknownTypeInputBuffer;
use UnknownTypeOutputBuffer;
pub struct Stream {
/// The high-priority audio processing thread calling callbacks.
/// Option used for moving out in destructor.
@ -86,13 +85,14 @@ pub struct StreamInner {
}
impl Stream {
pub(crate) fn new<D, E>(
pub(crate) fn new_input<T, D, E>(
stream_inner: StreamInner,
mut data_callback: D,
mut error_callback: E,
) -> Stream
where
D: FnMut(StreamData) + Send + 'static,
T: Sample,
D: FnMut(InputData<T>) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let pending_scheduled_event =
@ -106,7 +106,37 @@ impl Stream {
};
let thread =
thread::spawn(move || run_inner(run_context, &mut data_callback, &mut error_callback));
thread::spawn(move || run_input(run_context, &mut data_callback, &mut error_callback));
Stream {
thread: Some(thread),
commands: tx,
pending_scheduled_event,
}
}
pub(crate) fn new_output<T, D, E>(
stream_inner: StreamInner,
mut data_callback: D,
mut error_callback: E,
) -> Stream
where
T: Sample,
D: FnMut(OutputData<T>) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let pending_scheduled_event =
unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) };
let (tx, rx) = channel();
let run_context = RunContext {
handles: vec![pending_scheduled_event, stream_inner.event],
stream: stream_inner,
commands: rx,
};
let thread =
thread::spawn(move || run_output(run_context, &mut data_callback, &mut error_callback));
Stream {
thread: Some(thread),
@ -255,160 +285,196 @@ fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError>
Ok(())
}
fn run_inner(
mut run_context: RunContext,
data_callback: &mut dyn FnMut(StreamData),
fn run_input<T>(
mut run_ctxt: RunContext,
data_callback: &mut dyn FnMut(InputData<T>),
error_callback: &mut dyn FnMut(StreamError),
) {
) where
T: Sample,
{
loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
Some(ControlFlow::Break) => break,
Some(ControlFlow::Continue) => continue,
None => (),
}
let capture_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Capture { capture_client } => capture_client,
_ => unreachable!(),
};
match process_input(&mut run_ctxt.stream, capture_client, data_callback, error_callback) {
ControlFlow::Break => break,
ControlFlow::Continue => continue,
}
}
}
fn run_output<T>(
mut run_ctxt: RunContext,
data_callback: &mut dyn FnMut(OutputData<T>),
error_callback: &mut dyn FnMut(StreamError),
) where
T: Sample,
{
loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
Some(ControlFlow::Break) => break,
Some(ControlFlow::Continue) => continue,
None => (),
}
let render_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Render { render_client } => render_client,
_ => unreachable!(),
};
match process_output(&mut run_ctxt.stream, render_client, data_callback, error_callback) {
ControlFlow::Break => break,
ControlFlow::Continue => continue,
}
}
}
enum ControlFlow {
Break,
Continue,
}
fn process_commands_and_await_signal(
run_context: &mut RunContext,
error_callback: &mut dyn FnMut(StreamError),
) -> Option<ControlFlow> {
// Process queued commands.
match process_commands(run_context) {
Ok(true) => (),
Ok(false) => return Some(ControlFlow::Break),
Err(err) => {
error_callback(err);
return Some(ControlFlow::Break);
}
};
// Wait for any of the handles to be signalled.
let handle_idx = match wait_for_handle_signal(&run_context.handles) {
Ok(idx) => idx,
Err(err) => {
error_callback(err.into());
return Some(ControlFlow::Break);
}
};
// If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in
// order for us to pick up the pending commands. Otherwise, a stream needs data.
if handle_idx == 0 {
return Some(ControlFlow::Continue);
}
None
}
// The loop for processing pending input data.
fn process_input<T>(
stream: &StreamInner,
capture_client: *mut audioclient::IAudioCaptureClient,
data_callback: &mut dyn FnMut(InputData<T>),
error_callback: &mut dyn FnMut(StreamError),
) -> ControlFlow
where
T: Sample,
{
let mut frames_available = 0;
unsafe {
'stream_loop: loop {
// Process queued commands.
match process_commands(&mut run_context) {
Ok(true) => (),
Ok(false) => break,
Err(err) => {
error_callback(err);
break 'stream_loop;
}
};
// Get the available data in the shared buffer.
let mut buffer: *mut BYTE = mem::uninitialized();
let mut flags = mem::uninitialized();
loop {
let hresult = (*capture_client).GetNextPacketSize(&mut frames_available);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
return ControlFlow::Break;
}
if frames_available == 0 {
return ControlFlow::Continue;
}
let hresult = (*capture_client).GetBuffer(
&mut buffer,
&mut frames_available,
&mut flags,
ptr::null_mut(),
ptr::null_mut(),
);
// Wait for any of the handles to be signalled.
let handle_idx = match wait_for_handle_signal(&run_context.handles) {
Ok(idx) => idx,
Err(err) => {
error_callback(err.into());
break 'stream_loop;
}
};
// If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in
// order for us to pick up the pending commands. Otherwise, a stream needs data.
if handle_idx == 0 {
// TODO: Can this happen?
if hresult == AUDCLNT_S_BUFFER_EMPTY {
continue;
} else if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
return ControlFlow::Break;
}
let stream = &mut run_context.stream;
let sample_size = stream.sample_format.sample_size();
debug_assert!(!buffer.is_null());
// Obtaining a pointer to the buffer.
match stream.client_flow {
AudioClientFlow::Capture { capture_client } => {
let mut frames_available = 0;
// Get the available data in the shared buffer.
let mut buffer: *mut BYTE = mem::uninitialized();
let mut flags = mem::uninitialized();
loop {
let hresult = (*capture_client).GetNextPacketSize(&mut frames_available);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
break 'stream_loop;
}
if frames_available == 0 {
break;
}
let hresult = (*capture_client).GetBuffer(
&mut buffer,
&mut frames_available,
&mut flags,
ptr::null_mut(),
ptr::null_mut(),
);
let buffer_len = frames_available as usize
* stream.bytes_per_frame as usize
/ mem::size_of::<T>();
// TODO: Can this happen?
if hresult == AUDCLNT_S_BUFFER_EMPTY {
continue;
} else if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
break 'stream_loop;
}
debug_assert!(!buffer.is_null());
let buffer_len = frames_available as usize
* stream.bytes_per_frame as usize
/ sample_size;
// Simplify the capture callback sample format branches.
macro_rules! capture_callback {
($T:ty, $Variant:ident) => {{
let buffer_data = buffer as *mut _ as *const $T;
let slice = slice::from_raw_parts(buffer_data, buffer_len);
let unknown_buffer =
UnknownTypeInputBuffer::$Variant(::InputBuffer {
buffer: slice,
});
let data = StreamData::Input {
buffer: unknown_buffer,
};
data_callback(data);
// Release the buffer.
let hresult = (*capture_client).ReleaseBuffer(frames_available);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
break 'stream_loop;
}
}};
}
match stream.sample_format {
SampleFormat::F32 => capture_callback!(f32, F32),
SampleFormat::I16 => capture_callback!(i16, I16),
SampleFormat::U16 => capture_callback!(u16, U16),
}
}
}
AudioClientFlow::Render { render_client } => {
// The number of frames available for writing.
let frames_available = match get_available_frames(&stream) {
Ok(0) => continue, // TODO: Can this happen?
Ok(n) => n,
Err(err) => {
error_callback(err);
break 'stream_loop;
}
};
let mut buffer: *mut BYTE = mem::uninitialized();
let hresult =
(*render_client).GetBuffer(frames_available, &mut buffer as *mut *mut _);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
break 'stream_loop;
}
debug_assert!(!buffer.is_null());
let buffer_len =
frames_available as usize * stream.bytes_per_frame as usize / sample_size;
// Simplify the render callback sample format branches.
macro_rules! render_callback {
($T:ty, $Variant:ident) => {{
let buffer_data = buffer as *mut $T;
let slice = slice::from_raw_parts_mut(buffer_data, buffer_len);
let unknown_buffer =
UnknownTypeOutputBuffer::$Variant(::OutputBuffer { buffer: slice });
let data = StreamData::Output {
buffer: unknown_buffer,
};
data_callback(data);
let hresult =
(*render_client).ReleaseBuffer(frames_available as u32, 0);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
break 'stream_loop;
}
}};
}
match stream.sample_format {
SampleFormat::F32 => render_callback!(f32, F32),
SampleFormat::I16 => render_callback!(i16, I16),
SampleFormat::U16 => render_callback!(u16, U16),
}
}
// Simplify the capture callback sample format branches.
let buffer_data = buffer as *mut _ as *const T;
let slice = slice::from_raw_parts(buffer_data, buffer_len);
let input_data = InputData { buffer: slice };
data_callback(input_data);
// Release the buffer.
let hresult = (*capture_client).ReleaseBuffer(frames_available);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
return ControlFlow::Break;
}
}
}
}
// The loop for writing output data.
fn process_output<T>(
stream: &StreamInner,
render_client: *mut audioclient::IAudioRenderClient,
data_callback: &mut dyn FnMut(OutputData<T>),
error_callback: &mut dyn FnMut(StreamError),
) -> ControlFlow
where
T: Sample,
{
// The number of frames available for writing.
let frames_available = match get_available_frames(&stream) {
Ok(0) => return ControlFlow::Continue, // TODO: Can this happen?
Ok(n) => n,
Err(err) => {
error_callback(err);
return ControlFlow::Break;
}
};
unsafe {
let mut buffer: *mut BYTE = mem::uninitialized();
let hresult =
(*render_client).GetBuffer(frames_available, &mut buffer as *mut *mut _);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
return ControlFlow::Break;
}
debug_assert!(!buffer.is_null());
let buffer_len =
frames_available as usize * stream.bytes_per_frame as usize / mem::size_of::<T>();
let buffer_data = buffer as *mut T;
let slice = slice::from_raw_parts_mut(buffer_data, buffer_len);
let output_data = OutputData { buffer: slice };
data_callback(output_data);
let hresult = (*render_client).ReleaseBuffer(frames_available as u32, 0);
if let Err(err) = stream_error_from_hresult(hresult) {
error_callback(err);
return ControlFlow::Break;
}
}
ControlFlow::Continue
}