diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index 1f5b831..6ac386f 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -12,7 +12,9 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; +use StreamCloseCause; use StreamData; +use StreamEvent; use SupportedFormat; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; @@ -319,13 +321,24 @@ pub struct StreamId(usize); pub struct EventLoop { // This `Arc` is shared with all the callbacks of coreaudio. - active_callbacks: Arc, + // + // TODO: Eventually, CPAL's API should be changed to allow for submitting a unique callback per + // stream to avoid streams blocking one another. + user_callback: Arc>, streams: Mutex>>, } -struct ActiveCallbacks { - // Whenever the `run()` method is called with a callback, this callback is put in this list. - callbacks: Mutex>, +enum UserCallback { + // When `run` is called with a callback, that callback will be stored here. + // + // It is essential for the safety of the program that this callback is removed before `run` + // returns (not possible with the current CPAL API). + Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)), + // A queue of events that have occurred but that have not yet been emitted to the user as we + // don't yet have a callback to do so. + Inactive { + pending_events: Vec<(StreamId, StreamEvent<'static>)> + }, } struct StreamInner { @@ -427,22 +440,32 @@ impl EventLoop { #[inline] pub fn new() -> EventLoop { EventLoop { - active_callbacks: Arc::new(ActiveCallbacks { callbacks: Mutex::new(Vec::new()) }), + user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), streams: Mutex::new(Vec::new()), } } #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamData) + Send + where F: FnMut(StreamId, StreamEvent) + Send { { - let callback: &mut (FnMut(StreamId, StreamData) + Send) = &mut callback; - self.active_callbacks - .callbacks - .lock() - .unwrap() - .push(unsafe { mem::transmute(callback) }); + let mut guard = self.user_callback.lock().unwrap(); + let pending_events = match *guard { + UserCallback::Inactive { ref mut pending_events } => { + mem::replace(pending_events, vec![]) + } + UserCallback::Active(_) => { + panic!("`EventLoop::run` was called when the event loop was already running"); + } + }; + + let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; + for (stream_id, event) in pending_events { + callback(stream_id, event); + } + + *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); } loop { @@ -450,8 +473,8 @@ impl EventLoop { thread::sleep(Duration::new(1u64, 0u32)); } - // Note: if we ever change this API so that `run` can return, then it is critical that - // we remove the callback from `active_callbacks`. + // It is critical that we remove the callback before returning (currently not possible). + // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; } fn next_stream_id(&self) -> usize { @@ -650,7 +673,7 @@ impl EventLoop { // Register the callback that is being called by coreaudio whenever it needs data to be // fed to the audio buffer. - let active_callbacks = self.active_callbacks.clone(); + let user_callback = self.user_callback.clone(); let sample_format = format.data_type; let bytes_per_channel = format.data_type.sample_size(); type Args = render_callback::Args; @@ -666,20 +689,21 @@ impl EventLoop { mData: data } = buffers[0]; - let mut callbacks = active_callbacks.callbacks.lock().unwrap(); + let mut user_callback = user_callback.lock().unwrap(); // A small macro to simplify handling the callback for different sample types. macro_rules! try_callback { ($SampleFormat:ident, $SampleType:ty) => {{ let data_len = (data_byte_size as usize / bytes_per_channel) as usize; let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len); - let callback = match callbacks.get_mut(0) { - Some(cb) => cb, - None => return Ok(()), + let callback = match *user_callback { + UserCallback::Active(ref mut cb) => cb, + UserCallback::Inactive { .. } => return Ok(()), }; let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let stream_data = StreamData::Input { buffer: unknown_type_buffer }; - callback(StreamId(stream_id), stream_data); + let stream_event = StreamEvent::Data(stream_data); + callback(StreamId(stream_id), stream_event); }}; } @@ -723,7 +747,7 @@ impl EventLoop { // Register the callback that is being called by coreaudio whenever it needs data to be // fed to the audio buffer. - let active_callbacks = self.active_callbacks.clone(); + let user_callback = self.user_callback.clone(); let sample_format = format.data_type; let bytes_per_channel = format.data_type.sample_size(); type Args = render_callback::Args; @@ -737,16 +761,16 @@ impl EventLoop { mData: data } = (*args.data.data).mBuffers[0]; - let mut callbacks = active_callbacks.callbacks.lock().unwrap(); + let mut user_callback = user_callback.lock().unwrap(); // A small macro to simplify handling the callback for different sample types. macro_rules! try_callback { ($SampleFormat:ident, $SampleType:ty, $equilibrium:expr) => {{ let data_len = (data_byte_size as usize / bytes_per_channel) as usize; let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len); - let callback = match callbacks.get_mut(0) { - Some(cb) => cb, - None => { + let callback = match *user_callback { + UserCallback::Active(ref mut cb) => cb, + UserCallback::Inactive { .. } => { for sample in data_slice.iter_mut() { *sample = $equilibrium; } @@ -755,7 +779,8 @@ impl EventLoop { }; let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice }); let stream_data = StreamData::Output { buffer: unknown_type_buffer }; - callback(StreamId(stream_id), stream_data); + let stream_event = StreamEvent::Data(stream_data); + callback(StreamId(stream_id), stream_event); }}; } @@ -777,16 +802,33 @@ impl EventLoop { Ok(StreamId(stream_id)) } - pub fn destroy_stream(&self, stream_id: StreamId) { - let mut streams = self.streams.lock().unwrap(); - streams[stream_id.0] = None; + fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { + let mut guard = self.user_callback.lock().unwrap(); + match *guard { + UserCallback::Active(ref mut callback) => callback(id, event), + UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), + } } - pub fn play_stream(&self, stream: StreamId) -> Result<(), PlayStreamError> { + pub fn destroy_stream(&self, stream_id: StreamId) { + { + let mut streams = self.streams.lock().unwrap(); + streams[stream_id.0] = None; + } + // Emit the `Close` event to the user. + let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); + self.emit_or_enqueue_event(stream_id, event); + } + + pub fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> { let mut streams = self.streams.lock().unwrap(); - let stream = streams[stream.0].as_mut().unwrap(); + let stream = streams[stream_id.0].as_mut().unwrap(); if !stream.playing { + // Emit the `Play` event to the user. This should not block, as the stream should not + // yet be playing if this is being called. + self.emit_or_enqueue_event(stream_id, StreamEvent::Play); + if let Err(e) = stream.audio_unit.start() { let description = format!("{}", std::error::Error::description(&e)); let err = BackendSpecificError { description }; @@ -797,9 +839,9 @@ impl EventLoop { Ok(()) } - pub fn pause_stream(&self, stream: StreamId) -> Result<(), PauseStreamError> { + pub fn pause_stream(&self, stream_id: StreamId) -> Result<(), PauseStreamError> { let mut streams = self.streams.lock().unwrap(); - let stream = streams[stream.0].as_mut().unwrap(); + let stream = streams[stream_id.0].as_mut().unwrap(); if stream.playing { if let Err(e) = stream.audio_unit.stop() { @@ -807,6 +849,10 @@ impl EventLoop { let err = BackendSpecificError { description }; return Err(err.into()); } + + // Emit the `Pause` event to the user. + self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); + stream.playing = false; } Ok(())