sharing asio streams

This commit is contained in:
Tom Gowan 2018-10-29 22:57:42 +11:00 committed by mitchmindtree
parent 76eb07a274
commit 8193bc9f79
2 changed files with 363 additions and 345 deletions

View File

@ -34,7 +34,7 @@ pub struct CbArgs<S, D> {
struct BufferCallback(Box<FnMut(i32) + Send>); struct BufferCallback(Box<FnMut(i32) + Send>);
lazy_static! { lazy_static! {
static ref buffer_callback: Mutex<[Option<BufferCallback>; 2]> = Mutex::new([None, None]); static ref buffer_callback: Mutex<Vec<Option<BufferCallback>>> = Mutex::new(Vec::new());
} }
lazy_static! { lazy_static! {
@ -62,6 +62,11 @@ struct DriverWrapper {
pub drivers: ai::AsioDrivers, pub drivers: ai::AsioDrivers,
} }
pub struct AsioStreams {
pub input: Option<AsioStream>,
pub output: Option<AsioStream>,
}
pub struct AsioStream { pub struct AsioStream {
pub buffer_infos: Vec<AsioBufferInfo>, pub buffer_infos: Vec<AsioBufferInfo>,
pub buffer_size: i32, pub buffer_size: i32,
@ -127,22 +132,14 @@ struct AsioCallbacks {
direct_process: c_long, direct_process: c_long,
) -> *mut ai::ASIOTime, ) -> *mut ai::ASIOTime,
} }
extern "C" fn buffer_switch_output(double_buffer_index: c_long, direct_process: c_long) -> () { extern "C" fn buffer_switch(double_buffer_index: c_long, direct_process: c_long) -> () {
let mut bc = buffer_callback.lock().unwrap(); let mut bcs = buffer_callback.lock().unwrap();
println!("output");
if let Some(ref mut bc) = bc[0] { for mut bc in bcs.iter_mut() {
if let Some(ref mut bc) = bc {
bc.run(double_buffer_index); bc.run(double_buffer_index);
} }
} }
extern "C" fn buffer_switch_input(double_buffer_index: c_long, direct_process: c_long) -> () {
let mut bc = buffer_callback.lock().unwrap();
println!("input");
if let Some(ref mut bc) = bc[1] {
bc.run(double_buffer_index);
}
} }
extern "C" fn sample_rate_did_change(s_rate: c_double) -> () {} extern "C" fn sample_rate_did_change(s_rate: c_double) -> () {}
@ -258,7 +255,7 @@ impl Drivers {
} }
} }
pub fn prepare_input_stream(&self, num_channels: usize) -> Result<AsioStream, AsioDriverError> { pub fn prepare_input_stream(&self, output: Option<AsioStream>, mut num_channels: usize) -> Result<AsioStreams, AsioDriverError> {
let mut buffer_infos = vec![ let mut buffer_infos = vec![
AsioBufferInfo { AsioBufferInfo {
is_input: 1, is_input: 1,
@ -268,73 +265,12 @@ impl Drivers {
num_channels num_channels
]; ];
let mut callbacks = AsioCallbacks { let streams = AsioStreams{input: Some(AsioStream{buffer_infos, buffer_size: 0}), output};
buffer_switch: buffer_switch_input, self.create_streams(streams)
sample_rate_did_change: sample_rate_did_change,
asio_message: asio_message,
buffer_switch_time_info: buffer_switch_time_info,
};
let mut min_b_size: c_long = 0;
let mut max_b_size: c_long = 0;
let mut pref_b_size: c_long = 0;
let mut grans: c_long = 0;
let mut result = Err(AsioDriverError::NoResult("not implimented".to_owned()));
unsafe {
asio_get_buffer_size(
&mut min_b_size,
&mut max_b_size,
&mut pref_b_size,
&mut grans,
).expect("Failed getting buffers");
result = if pref_b_size > 0 {
let mut buffer_info_convert: Vec<ai::ASIOBufferInfo> = buffer_infos
.into_iter()
.map(|bi| mem::transmute::<AsioBufferInfo, ai::ASIOBufferInfo>(bi))
.collect();
let mut callbacks_convert =
mem::transmute::<AsioCallbacks, ai::ASIOCallbacks>(callbacks);
let buffer_result = asio_create_buffers(
buffer_info_convert.as_mut_ptr(),
num_channels as i32,
pref_b_size,
&mut callbacks_convert,
);
if buffer_result.is_ok() {
let mut buffer_infos: Vec<AsioBufferInfo> = buffer_info_convert
.into_iter()
.map(|bi| mem::transmute::<ai::ASIOBufferInfo, AsioBufferInfo>(bi))
.collect();
for d in &buffer_infos {
println!("after {:?}", d);
}
println!("channels: {:?}", num_channels);
STREAM_DRIVER_COUNT.fetch_add(1, Ordering::SeqCst);
return Ok(AsioStream {
buffer_infos: buffer_infos,
buffer_size: pref_b_size,
});
}
Err(AsioDriverError::BufferError(format!(
"failed to create buffers,
error \
code: {:?}",
buffer_result
)))
} else {
Err(AsioDriverError::BufferError(
"Failed to get buffer size".to_owned(),
))
};
}
result
} }
/// Creates the output stream /// Creates the output stream
pub fn prepare_output_stream(&self, num_channels: usize) -> Result<AsioStream, AsioDriverError> { pub fn prepare_output_stream(&self, input: Option<AsioStream>, num_channels: usize) -> Result<AsioStreams, AsioDriverError> {
// Initialize data for FFI // Initialize data for FFI
let mut buffer_infos = vec![ let mut buffer_infos = vec![
AsioBufferInfo { AsioBufferInfo {
@ -344,9 +280,66 @@ impl Drivers {
}; };
num_channels num_channels
]; ];
let streams = AsioStreams{output: Some(AsioStream{buffer_infos, buffer_size: 0}), input};
self.create_streams(streams)
}
/// Creates the output stream
fn create_streams(&self, streams: AsioStreams) -> Result<AsioStreams, AsioDriverError> {
let AsioStreams {
input,
output,
} = streams;
match (input, output) {
(Some(input), Some(mut output)) => {
let split_point = input.buffer_infos.len();
let mut bi = input.buffer_infos;
bi.append(&mut output.buffer_infos);
self.create_buffers(bi)
.map(|(mut bi, buffer_size)|{
let out_bi = bi.split_off(split_point);
let in_bi = bi;
let output = Some(AsioStream{
buffer_infos: out_bi,
buffer_size,
});
let input = Some(AsioStream{
buffer_infos: in_bi,
buffer_size,
});
AsioStreams{output, input}
})
},
(Some(input), None) => {
self.create_buffers(input.buffer_infos)
.map(|(buffer_infos, buffer_size)| AsioStreams{
input: Some(AsioStream{
buffer_infos,
buffer_size,
}),
output: None,
})
},
(None, Some(output)) => {
self.create_buffers(output.buffer_infos)
.map(|(buffer_infos, buffer_size)| AsioStreams{
output: Some(AsioStream{
buffer_infos,
buffer_size,
}),
input: None,
})
},
(None, None) => panic!("Trying to create streams without preparing"),
}
}
fn create_buffers(&self, buffer_infos: Vec<AsioBufferInfo>)
-> Result<(Vec<AsioBufferInfo>, c_long), AsioDriverError>{
let num_channels = buffer_infos.len();
let mut callbacks = AsioCallbacks { let mut callbacks = AsioCallbacks {
buffer_switch: buffer_switch_output, buffer_switch: buffer_switch,
sample_rate_did_change: sample_rate_did_change, sample_rate_did_change: sample_rate_did_change,
asio_message: asio_message, asio_message: asio_message,
buffer_switch_time_info: buffer_switch_time_info, buffer_switch_time_info: buffer_switch_time_info,
@ -357,7 +350,7 @@ impl Drivers {
let mut pref_b_size: c_long = 0; let mut pref_b_size: c_long = 0;
let mut grans: c_long = 0; let mut grans: c_long = 0;
let mut result = Err(AsioDriverError::NoResult("not implimented".to_owned())); //let mut result = Err(AsioDriverError::NoResult("not implimented".to_owned()));
unsafe { unsafe {
// Get the buffer sizes // Get the buffer sizes
@ -371,20 +364,19 @@ impl Drivers {
&mut pref_b_size, &mut pref_b_size,
&mut grans, &mut grans,
).expect("Failed getting buffers"); ).expect("Failed getting buffers");
result = if pref_b_size > 0 { if pref_b_size > 0 {
let mut buffer_info_convert: Vec<ai::ASIOBufferInfo> = buffer_infos let mut buffer_info_convert: Vec<ai::ASIOBufferInfo> = buffer_infos
.into_iter() .into_iter()
.map(|bi| mem::transmute::<AsioBufferInfo, ai::ASIOBufferInfo>(bi)) .map(|bi| mem::transmute::<AsioBufferInfo, ai::ASIOBufferInfo>(bi))
.collect(); .collect();
let mut callbacks_convert = let mut callbacks_convert =
mem::transmute::<AsioCallbacks, ai::ASIOCallbacks>(callbacks); mem::transmute::<AsioCallbacks, ai::ASIOCallbacks>(callbacks);
let buffer_result = asio_create_buffers( asio_create_buffers(
buffer_info_convert.as_mut_ptr(), buffer_info_convert.as_mut_ptr(),
num_channels as i32, num_channels as i32,
pref_b_size, pref_b_size,
&mut callbacks_convert, &mut callbacks_convert,
); ).map(|_|{
if buffer_result.is_ok() {
let mut buffer_infos: Vec<AsioBufferInfo> = buffer_info_convert let mut buffer_infos: Vec<AsioBufferInfo> = buffer_info_convert
.into_iter() .into_iter()
.map(|bi| mem::transmute::<ai::ASIOBufferInfo, AsioBufferInfo>(bi)) .map(|bi| mem::transmute::<ai::ASIOBufferInfo, AsioBufferInfo>(bi))
@ -395,22 +387,17 @@ impl Drivers {
println!("channels: {:?}", num_channels); println!("channels: {:?}", num_channels);
STREAM_DRIVER_COUNT.fetch_add(1, Ordering::SeqCst); STREAM_DRIVER_COUNT.fetch_add(1, Ordering::SeqCst);
return Ok(AsioStream { (buffer_infos, pref_b_size)
buffer_infos: buffer_infos, }).map_err(|e|{
buffer_size: pref_b_size, AsioDriverError::BufferError(format!(
}); "failed to create buffers, error code: {:?}", e))
} })
Err(AsioDriverError::BufferError(format!(
"failed to create buffers, error code: {:?}",
buffer_result
)))
} else { } else {
Err(AsioDriverError::BufferError( Err(AsioDriverError::BufferError(
"Failed to get buffer size".to_owned(), "bad buffer size".to_owned(),
)) ))
};
} }
result }
} }
} }
@ -428,7 +415,7 @@ impl Drop for Drivers {
} }
} }
} }
/* TODO this should be tied to cpal streams and not AsioStreams
impl Drop for AsioStream { impl Drop for AsioStream {
fn drop(&mut self) { fn drop(&mut self) {
println!("dropping stream"); println!("dropping stream");
@ -443,6 +430,7 @@ impl Drop for AsioStream {
} }
} }
} }
*/
unsafe impl Send for DriverWrapper {} unsafe impl Send for DriverWrapper {}
@ -455,18 +443,12 @@ impl BufferCallback {
unsafe impl Send for AsioStream {} unsafe impl Send for AsioStream {}
pub fn set_callback<F: 'static>(input: bool, mut callback: F) -> () pub fn set_callback<F: 'static>(mut callback: F) -> ()
where where
F: FnMut(i32) + Send, F: FnMut(i32) + Send,
{ {
let mut bc = buffer_callback.lock().unwrap(); let mut bc = buffer_callback.lock().unwrap();
if input { bc.push(Some(BufferCallback(Box::new(callback))));
println!("Set input callback");
bc[1] = Some(BufferCallback(Box::new(callback)));
}else{
println!("Set output callback");
bc[0] = Some(BufferCallback(Box::new(callback)));
}
} }
/// Returns a list of all the ASIO drivers /// Returns a list of all the ASIO drivers

View File

@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use SampleFormat; use SampleFormat;
pub struct EventLoop { pub struct EventLoop {
asio_streams: Arc<Mutex<Vec<Option<sys::AsioStream>>>>, asio_streams: Arc<Mutex<sys::AsioStreams>>,
stream_count: AtomicUsize, stream_count: AtomicUsize,
callbacks: Arc<Mutex<Vec<&'static mut (FnMut(StreamId, StreamData) + Send)>>>, callbacks: Arc<Mutex<Vec<&'static mut (FnMut(StreamId, StreamData) + Send)>>>,
} }
@ -55,12 +55,58 @@ struct Buffers {
impl EventLoop { impl EventLoop {
pub fn new() -> EventLoop { pub fn new() -> EventLoop {
EventLoop { EventLoop {
asio_streams: Arc::new(Mutex::new(Vec::new())), asio_streams: Arc::new(Mutex::new(sys::AsioStreams{input: None, output: None})),
stream_count: AtomicUsize::new(0), stream_count: AtomicUsize::new(0),
callbacks: Arc::new(Mutex::new(Vec::new())), callbacks: Arc::new(Mutex::new(Vec::new())),
} }
} }
fn get_input_stream(&self, drivers: &sys::Drivers, num_channels: usize) -> Result<usize, CreationError> {
let ref mut streams = *self.asio_streams.lock().unwrap();
match streams.input {
Some(ref input) => Ok(input.buffer_size as usize),
None => {
let output = streams.output.take();
drivers.prepare_input_stream(output, num_channels)
.map(|new_streams| {
let bs = match new_streams.input {
Some(ref inp) => inp.buffer_size as usize,
None => unreachable!(),
};
*streams = new_streams;
bs
})
.map_err(|ref e| {
println!("Error preparing stream: {}", e);
CreationError::DeviceNotAvailable
})
}
}
}
fn get_output_stream(&self, drivers: &sys::Drivers, num_channels: usize) -> Result<usize, CreationError> {
let ref mut streams = *self.asio_streams.lock().unwrap();
match streams.output {
Some(ref output) => Ok(output.buffer_size as usize),
None => {
let input = streams.input.take();
drivers.prepare_output_stream(input, num_channels)
.map(|new_streams| {
let bs = match new_streams.output {
Some(ref out) => out.buffer_size as usize,
None => unreachable!(),
};
*streams = new_streams;
bs
})
.map_err(|ref e| {
println!("Error preparing stream: {}", e);
CreationError::DeviceNotAvailable
})
},
}
}
pub fn build_input_stream( pub fn build_input_stream(
&self, &self,
device: &Device, device: &Device,
@ -72,13 +118,8 @@ impl EventLoop {
} = device; } = device;
let num_channels = format.channels.clone(); let num_channels = format.channels.clone();
let stream_type = drivers.get_data_type().expect("Couldn't load data type"); let stream_type = drivers.get_data_type().expect("Couldn't load data type");
match drivers.prepare_input_stream(num_channels as usize) { self.get_input_stream(&drivers, num_channels as usize).map(|stream_buffer_size| {
Ok(stream) => { let cpal_num_samples = stream_buffer_size * num_channels as usize;
let cpal_num_samples =
(stream.buffer_size as usize) * num_channels as usize;
{
self.asio_streams.lock().unwrap().push(Some(stream));
}
let count = self.stream_count.load(Ordering::SeqCst); let count = self.stream_count.load(Ordering::SeqCst);
self.stream_count.store(count + 1, Ordering::SeqCst); self.stream_count.store(count + 1, Ordering::SeqCst);
let asio_streams = self.asio_streams.clone(); let asio_streams = self.asio_streams.clone();
@ -126,8 +167,8 @@ impl EventLoop {
} }
}; };
sys::set_callback(true, move |index| unsafe { sys::set_callback(move |index| unsafe {
if let Some(ref asio_stream) = asio_streams.lock().unwrap()[count - 1] { if let Some(ref asio_stream) = asio_streams.lock().unwrap().input {
// Number of samples needed total // Number of samples needed total
let mut callbacks = callbacks.lock().unwrap(); let mut callbacks = callbacks.lock().unwrap();
@ -218,13 +259,8 @@ impl EventLoop {
} }
} }
}); });
Ok(StreamId(count)) StreamId(count)
} })
Err(ref e) => {
println!("Error preparing stream: {}", e);
Err(CreationError::DeviceNotAvailable)
}
}
} }
pub fn build_output_stream( pub fn build_output_stream(
@ -238,13 +274,8 @@ pub fn build_output_stream(
} = device; } = device;
let num_channels = format.channels.clone(); let num_channels = format.channels.clone();
let stream_type = drivers.get_data_type().expect("Couldn't load data type"); let stream_type = drivers.get_data_type().expect("Couldn't load data type");
match drivers.prepare_output_stream(num_channels as usize) { self.get_output_stream(&drivers, num_channels as usize).map(|stream_buffer_size| {
Ok(stream) => { let cpal_num_samples = stream_buffer_size * num_channels as usize;
let cpal_num_samples =
(stream.buffer_size as usize) * num_channels as usize;
{
self.asio_streams.lock().unwrap().push(Some(stream));
}
let count = self.stream_count.load(Ordering::SeqCst); let count = self.stream_count.load(Ordering::SeqCst);
self.stream_count.store(count + 1, Ordering::SeqCst); self.stream_count.store(count + 1, Ordering::SeqCst);
let asio_streams = self.asio_streams.clone(); let asio_streams = self.asio_streams.clone();
@ -291,8 +322,8 @@ pub fn build_output_stream(
} }
}; };
sys::set_callback(false, move |index| unsafe { sys::set_callback(move |index| unsafe {
if let Some(ref asio_stream) = asio_streams.lock().unwrap()[count - 1] { if let Some(ref asio_stream) = asio_streams.lock().unwrap().output {
// Number of samples needed total // Number of samples needed total
let mut callbacks = callbacks.lock().unwrap(); let mut callbacks = callbacks.lock().unwrap();
@ -338,11 +369,13 @@ pub fn build_output_stream(
// For each channel write the cpal data to // For each channel write the cpal data to
// the asio buffer // the asio buffer
// Also need to check for Endian // TODO need to check for Endian
for (i, channel) in my_buffers.channel.iter().enumerate(){ for (i, channel) in my_buffers.channel.iter().enumerate(){
let buff_ptr = (asio_stream let buff_ptr = (asio_stream
.buffer_infos[i] .buffer_infos[i]
.buffers[index as usize] as *mut $AsioType) .buffers[index as usize] as *mut $AsioType)
// I'm not sure if this is needed anymore
// Why should we offset the pointer?
.offset(asio_stream.buffer_size as isize * i as isize); .offset(asio_stream.buffer_size as isize * i as isize);
let asio_buffer: &'static mut [$AsioType] = let asio_buffer: &'static mut [$AsioType] =
std::slice::from_raw_parts_mut( std::slice::from_raw_parts_mut(
@ -384,13 +417,8 @@ pub fn build_output_stream(
} }
} }
}); });
Ok(StreamId(count)) StreamId(count)
} })
Err(ref e) => {
println!("Error preparing stream: {}", e);
Err(CreationError::DeviceNotAvailable)
}
}
} }
pub fn play_stream(&self, stream: StreamId) { pub fn play_stream(&self, stream: StreamId) {
@ -400,12 +428,20 @@ pub fn play_stream(&self, stream: StreamId) {
pub fn pause_stream(&self, stream: StreamId) { pub fn pause_stream(&self, stream: StreamId) {
sys::stop(); sys::stop();
} }
// TODO the logic for this is wrong
// We are not destroying AsioStreams but CPAL streams
// Asio Streams should only be destroyed if there are no
// CPAL streams left
pub fn destroy_stream(&self, stream_id: StreamId) { pub fn destroy_stream(&self, stream_id: StreamId) {
/*
let mut asio_streams_lock = self.asio_streams.lock().unwrap(); let mut asio_streams_lock = self.asio_streams.lock().unwrap();
let old_stream = mem::replace(asio_streams_lock.get_mut(stream_id.0 - 1).expect("stream count out of bounds"), None); let old_stream = mem::replace(asio_streams_lock.get_mut(stream_id.0 - 1).expect("stream count out of bounds"), None);
if let Some(old_stream) = old_stream { if let Some(old_stream) = old_stream {
sys::destroy_stream(old_stream); sys::destroy_stream(old_stream);
} }
*/
unimplemented!()
} }
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where where