From cc26897acd527d30968907408fca1ae567c5ee23 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Fri, 12 Aug 2016 17:49:13 +1000 Subject: [PATCH] Update coreaudio backend to new futures-rs oriented design. This depends on the changes introduced in #121. Update to coreaudio 0.6. --- Cargo.toml | 2 +- src/coreaudio/mod.rs | 298 +++++++++++++++++++------------------------ 2 files changed, 130 insertions(+), 170 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 027e03b..e81b4b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ version = "0" path = "alsa-sys" [target.x86_64-apple-darwin.dependencies] -coreaudio-rs = "~0.5.0" +coreaudio-rs = "0.6" [dev-dependencies] vorbis = "0" diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index eeaca15..9f1ffc0 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -1,20 +1,21 @@ extern crate coreaudio; extern crate libc; -use std::sync::mpsc::{channel, Sender, Receiver}; -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::cell::RefCell; -use std::mem; -use std::cmp; -use std::marker::PhantomData; - use CreationError; use Format; use FormatsEnumerationError; +use Sample; use SampleFormat; use SamplesRate; use ChannelPosition; +use UnknownTypeBuffer; + +use futures::{Poll, Task, TaskHandle}; +use futures::stream::Stream; +use std::sync::{Arc, Mutex}; + +use self::coreaudio::audio_unit::AudioUnit; +use self::coreaudio::audio_unit::render_callback::{self, data}; mod enumerate; @@ -22,9 +23,6 @@ pub use self::enumerate::{EndpointsIterator, SupportedFormatsIterator, get_default_endpoint}; -use self::coreaudio::audio_unit::{AudioUnit, IOType}; -use self::coreaudio::audio_unit::render_callback::{self, data}; - #[derive(Clone, PartialEq, Eq)] pub struct Endpoint; @@ -44,36 +42,41 @@ impl Endpoint { } } -pub struct Buffer<'a, T: 'a> { - samples_sender: Sender<(Vec, NumChannels)>, - samples: Vec, - num_channels: NumChannels, - marker: PhantomData<&'a T>, - pending_samples: Arc +pub struct EventLoop; +impl EventLoop { + #[inline] + pub fn new() -> EventLoop { EventLoop } + #[inline] + pub fn run(&self) { loop {} } } -impl<'a, T> Buffer<'a, T> { +pub struct Buffer { + args: render_callback::Args>, + buffer: Vec, +} + +impl Buffer where T: Sample { #[inline] - pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] { - &mut self.samples[..] + pub fn get_buffer(&mut self) -> &mut [T] { + &mut self.buffer[..] } #[inline] pub fn len(&self) -> usize { - self.samples.len() + self.buffer.len() } #[inline] pub fn finish(self) { - let Buffer { samples_sender, samples, num_channels, pending_samples, .. } = self; // TODO: At the moment this assumes the Vec is a Vec. // Need to add T: Sample and use Sample::to_vec_f32. - let num_samples = samples.len(); - let samples = unsafe { mem::transmute(samples) }; - pending_samples.fetch_add(num_samples, Ordering::SeqCst); - match samples_sender.send((samples, num_channels)) { - Err(_) => panic!("Failed to send samples to audio unit callback."), - Ok(()) => (), + let Buffer { mut args, buffer } = self; + + let num_channels = args.data.channels().count(); + for (i, frame) in buffer.chunks(num_channels).enumerate() { + for (channel, sample) in args.data.channels_mut().zip(frame.iter()) { + channel[i] = *sample; + } } } } @@ -81,91 +84,116 @@ impl<'a, T> Buffer<'a, T> { type NumChannels = usize; type NumFrames = usize; +pub struct Voice; + #[allow(dead_code)] // the audio_unit will be dropped if we don't hold it. -pub struct Voice { +pub struct SamplesStream { audio_unit: AudioUnit, - ready_receiver: Receiver<(NumChannels, NumFrames)>, - samples_sender: Sender<(Vec, NumChannels)>, - underflow: Arc>>, - last_ready: Arc>>>, - pending_samples: Arc + inner: Arc>, } -unsafe impl Sync for Voice {} -unsafe impl Send for Voice {} + +struct SamplesStreamInner { + scheduled_task: Option, + current_callback: Option>>, +} + +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); + + fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { + let mut inner = self.inner.lock().unwrap(); + + // There are two possibilites: either we're answering a callback of coreaudio and we return + // a buffer, or we're not answering a callback and we return that we're not ready. + + let current_callback = match inner.current_callback.take() { + Some(c) => c, + None => return Poll::NotReady + }; + + let buffer_len = current_callback.num_frames * current_callback.data.channels().count(); + + let buffer = Buffer { + args: current_callback, + buffer: vec![0.0; buffer_len], + }; + + Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) + } + + fn schedule(&mut self, task: &mut Task) { + self.inner.lock().unwrap().scheduled_task = Some(task.handle().clone()); + } +} impl Voice { - pub fn new(_: &Endpoint, _: &Format) -> Result { - // A channel for signalling that the audio unit is ready for data. - let (ready_sender, ready_receiver) = channel(); - // A channel for sending the audio callback a pointer to the sample data. - let (samples_sender, samples_receiver) = channel(); + pub fn new(_: &Endpoint, _: &Format, _: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { + let inner = Arc::new(Mutex::new(SamplesStreamInner { + scheduled_task: None, + current_callback: None, + })); - let underflow = Arc::new(Mutex::new(RefCell::new(false))); - let uf_clone = underflow.clone(); - - let pending_samples: Arc = Arc::new(AtomicUsize::new(0)); - - let pending_samples_c = pending_samples.clone(); - - let audio_unit_result = AudioUnit::new(IOType::HalOutput); - - if let Ok(mut audio_unit) = audio_unit_result { - // TODO: iOS uses integer and fixed-point data - if let Ok(()) = audio_unit.set_render_callback(move |args: render_callback::Args>| { - let render_callback::Args { num_frames, mut data, .. } = args; - let num_channels = data.channels().count(); - if let Err(_) = ready_sender.send((num_channels, num_frames)) { - return Err(()); - } - loop { - if let Ok((samples, num_channels)) = samples_receiver.try_recv() { - let samples: Vec = samples; - if let Ok(uf) = uf_clone.lock() { - *(uf.borrow_mut()) = num_frames > samples.len() / num_channels; - } else { return Err(()) } - - pending_samples_c.fetch_sub(samples.len(), Ordering::SeqCst); - - for (i, frame) in samples.chunks(num_channels).enumerate() { - for (channel, sample) in data.channels_mut().zip(frame.iter()) { - channel[i] = *sample; - } - } - - break; - }; - } - Ok(()) - - }) { - if let Ok(()) = audio_unit.start() { - return Ok(Voice { - audio_unit: audio_unit, - ready_receiver: ready_receiver, - samples_sender: samples_sender, - underflow: underflow, - last_ready: Arc::new(Mutex::new(RefCell::new(None))), - pending_samples: pending_samples - }) - } + fn convert_error(err: coreaudio::Error) -> CreationError { + match err { + coreaudio::Error::RenderCallbackBufferFormatDoesNotMatchAudioUnitStreamFormat | + coreaudio::Error::NoKnownSubtype | + coreaudio::Error::AudioUnit(coreaudio::error::AudioUnitError::FormatNotSupported) | + coreaudio::Error::AudioCodec(_) | + coreaudio::Error::AudioFormat(_) => CreationError::FormatNotSupported, + _ => CreationError::DeviceNotAvailable, } } - Err(CreationError::DeviceNotAvailable) - } + let au_type = coreaudio::audio_unit::IOType::DefaultOutput; + let mut audio_unit = try!(AudioUnit::new(au_type).map_err(convert_error)); - pub fn append_data<'a, T>(&'a mut self, max_elements: usize) -> Buffer<'a, T> where T: Clone { - // Block until the audio callback is ready for more data. - let (channels, frames) = self.block_until_ready(); - let buffer_size = cmp::min(channels * frames, max_elements); - Buffer { - samples_sender: self.samples_sender.clone(), - samples: vec![unsafe { mem::uninitialized() }; buffer_size], - num_channels: channels as usize, - marker: PhantomData, - pending_samples: self.pending_samples.clone() + // TODO: iOS uses integer and fixed-point data + + { + let inner = inner.clone(); + let result = audio_unit.set_render_callback(move |args| { + // This callback is entered whenever the coreaudio engine needs to be fed data. + + // Store the callback argument in the `SamplesStreamInner` and return the task + // that we're supposed to notify. + let scheduled = { + let mut inner = inner.lock().unwrap(); + + assert!(inner.current_callback.is_none()); + inner.current_callback = Some(args); + + inner.scheduled_task.take() + }; + + // It is important that `inner` is unlocked here. + if let Some(scheduled) = scheduled { + // Calling `notify()` should immediately call `poll()` on the `SamplesStream`, + // which will use the data we stored in `current_callback`. + scheduled.notify(); + } + + // TODO: what should happen if the callback wasn't processed? in other word, what + // if the user didn't register any handler or did a stupid thing in the + // handler (like mem::forgetting the buffer)? + + Ok(()) + }); + + try!(result.map_err(convert_error)); } + + try!(audio_unit.start().map_err(convert_error)); + + let samples_stream = SamplesStream { + audio_unit: audio_unit, + inner: inner, + }; + + Ok((Voice, samples_stream)) } #[inline] @@ -177,72 +205,4 @@ impl Voice { pub fn pause(&mut self) { unimplemented!() } - - #[inline] - pub fn get_period(&self) -> usize { - if let Some(ready) = self.update_last_ready() { - (ready.0 * ready.1) as usize - } else { - 0 - } - } - - #[inline] - pub fn get_pending_samples(&self) -> usize { - self.pending_samples.load(Ordering::Relaxed) - } - - /// Attempts to store the most recent ready message into the internal - /// ref cell, then return the last ready message. If the last ready hasn't - /// been reset with `clear_last_ready`, then it will not be set and the - /// current value will be returned. Else, the ready_receiver will be - /// try_recv'd and if it is ready, the last ready will be set and returned. - /// Finally, if the ready_receiver had no data at try_recv, None will be - /// returned. - #[inline] - fn update_last_ready(&self) -> Option<(NumChannels, NumFrames)> { - let refcell = self.last_ready.lock().unwrap(); - let data = refcell.borrow(); - if let Some(s) = *data { - // - return Some(s); - } else { - drop(data); - let mut data = refcell.borrow_mut(); - if let Ok(ready) = self.ready_receiver.try_recv() { - // the audiounit is ready so we can set last_ready - *data = Some(ready); - return *data; - } - } - None - } - - /// Block until ready to send data. This checks last_ready first. In any - /// case, last_ready will be set to None when this function returns. - fn block_until_ready(&self) -> (NumChannels, NumFrames) { - let refcell = self.last_ready.lock().unwrap(); - let data = refcell.borrow(); - if let Some(s) = *data { - drop(data); - let mut data = refcell.borrow_mut(); - *data = None; - return s; - } else { - match self.ready_receiver.recv() { - Ok(ready) => { - return ready; - }, - Err(e) => panic!("Couldn't receive a ready message: \ - {:?}", e) - } - } - } - - #[inline] - pub fn underflowed(&self) -> bool { - let uf = self.underflow.lock().unwrap(); - let v = uf.borrow(); - *v - } }