Update coreaudio backend for `StreamEvent` API addition.

This commit is contained in:
mitchmindtree 2019-06-22 15:23:46 +02:00
parent fddea2edd8
commit b2c1226b47
1 changed files with 79 additions and 33 deletions

View File

@ -12,7 +12,9 @@ use PlayStreamError;
use SupportedFormatsError; use SupportedFormatsError;
use SampleFormat; use SampleFormat;
use SampleRate; use SampleRate;
use StreamCloseCause;
use StreamData; use StreamData;
use StreamEvent;
use SupportedFormat; use SupportedFormat;
use UnknownTypeInputBuffer; use UnknownTypeInputBuffer;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
@ -319,13 +321,24 @@ pub struct StreamId(usize);
pub struct EventLoop { pub struct EventLoop {
// This `Arc` is shared with all the callbacks of coreaudio. // This `Arc` is shared with all the callbacks of coreaudio.
active_callbacks: Arc<ActiveCallbacks>, //
// TODO: Eventually, CPAL's API should be changed to allow for submitting a unique callback per
// stream to avoid streams blocking one another.
user_callback: Arc<Mutex<UserCallback>>,
streams: Mutex<Vec<Option<StreamInner>>>, streams: Mutex<Vec<Option<StreamInner>>>,
} }
struct ActiveCallbacks { enum UserCallback {
// Whenever the `run()` method is called with a callback, this callback is put in this list. // When `run` is called with a callback, that callback will be stored here.
callbacks: Mutex<Vec<&'static mut (FnMut(StreamId, StreamData) + Send)>>, //
// It is essential for the safety of the program that this callback is removed before `run`
// returns (not possible with the current CPAL API).
Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)),
// A queue of events that have occurred but that have not yet been emitted to the user as we
// don't yet have a callback to do so.
Inactive {
pending_events: Vec<(StreamId, StreamEvent<'static>)>
},
} }
struct StreamInner { struct StreamInner {
@ -427,22 +440,32 @@ impl EventLoop {
#[inline] #[inline]
pub fn new() -> EventLoop { pub fn new() -> EventLoop {
EventLoop { EventLoop {
active_callbacks: Arc::new(ActiveCallbacks { callbacks: Mutex::new(Vec::new()) }), user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })),
streams: Mutex::new(Vec::new()), streams: Mutex::new(Vec::new()),
} }
} }
#[inline] #[inline]
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamData) + Send where F: FnMut(StreamId, StreamEvent) + Send
{ {
{ {
let callback: &mut (FnMut(StreamId, StreamData) + Send) = &mut callback; let mut guard = self.user_callback.lock().unwrap();
self.active_callbacks let pending_events = match *guard {
.callbacks UserCallback::Inactive { ref mut pending_events } => {
.lock() mem::replace(pending_events, vec![])
.unwrap() }
.push(unsafe { mem::transmute(callback) }); UserCallback::Active(_) => {
panic!("`EventLoop::run` was called when the event loop was already running");
}
};
let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback;
for (stream_id, event) in pending_events {
callback(stream_id, event);
}
*guard = UserCallback::Active(unsafe { mem::transmute(callback) });
} }
loop { loop {
@ -450,8 +473,8 @@ impl EventLoop {
thread::sleep(Duration::new(1u64, 0u32)); thread::sleep(Duration::new(1u64, 0u32));
} }
// Note: if we ever change this API so that `run` can return, then it is critical that // It is critical that we remove the callback before returning (currently not possible).
// we remove the callback from `active_callbacks`. // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] };
} }
fn next_stream_id(&self) -> usize { fn next_stream_id(&self) -> usize {
@ -650,7 +673,7 @@ impl EventLoop {
// Register the callback that is being called by coreaudio whenever it needs data to be // Register the callback that is being called by coreaudio whenever it needs data to be
// fed to the audio buffer. // fed to the audio buffer.
let active_callbacks = self.active_callbacks.clone(); let user_callback = self.user_callback.clone();
let sample_format = format.data_type; let sample_format = format.data_type;
let bytes_per_channel = format.data_type.sample_size(); let bytes_per_channel = format.data_type.sample_size();
type Args = render_callback::Args<data::Raw>; type Args = render_callback::Args<data::Raw>;
@ -666,20 +689,21 @@ impl EventLoop {
mData: data mData: data
} = buffers[0]; } = buffers[0];
let mut callbacks = active_callbacks.callbacks.lock().unwrap(); let mut user_callback = user_callback.lock().unwrap();
// A small macro to simplify handling the callback for different sample types. // A small macro to simplify handling the callback for different sample types.
macro_rules! try_callback { macro_rules! try_callback {
($SampleFormat:ident, $SampleType:ty) => {{ ($SampleFormat:ident, $SampleType:ty) => {{
let data_len = (data_byte_size as usize / bytes_per_channel) as usize; let data_len = (data_byte_size as usize / bytes_per_channel) as usize;
let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len); let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len);
let callback = match callbacks.get_mut(0) { let callback = match *user_callback {
Some(cb) => cb, UserCallback::Active(ref mut cb) => cb,
None => return Ok(()), UserCallback::Inactive { .. } => return Ok(()),
}; };
let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice });
let stream_data = StreamData::Input { buffer: unknown_type_buffer }; let stream_data = StreamData::Input { buffer: unknown_type_buffer };
callback(StreamId(stream_id), stream_data); let stream_event = StreamEvent::Data(stream_data);
callback(StreamId(stream_id), stream_event);
}}; }};
} }
@ -723,7 +747,7 @@ impl EventLoop {
// Register the callback that is being called by coreaudio whenever it needs data to be // Register the callback that is being called by coreaudio whenever it needs data to be
// fed to the audio buffer. // fed to the audio buffer.
let active_callbacks = self.active_callbacks.clone(); let user_callback = self.user_callback.clone();
let sample_format = format.data_type; let sample_format = format.data_type;
let bytes_per_channel = format.data_type.sample_size(); let bytes_per_channel = format.data_type.sample_size();
type Args = render_callback::Args<data::Raw>; type Args = render_callback::Args<data::Raw>;
@ -737,16 +761,16 @@ impl EventLoop {
mData: data mData: data
} = (*args.data.data).mBuffers[0]; } = (*args.data.data).mBuffers[0];
let mut callbacks = active_callbacks.callbacks.lock().unwrap(); let mut user_callback = user_callback.lock().unwrap();
// A small macro to simplify handling the callback for different sample types. // A small macro to simplify handling the callback for different sample types.
macro_rules! try_callback { macro_rules! try_callback {
($SampleFormat:ident, $SampleType:ty, $equilibrium:expr) => {{ ($SampleFormat:ident, $SampleType:ty, $equilibrium:expr) => {{
let data_len = (data_byte_size as usize / bytes_per_channel) as usize; let data_len = (data_byte_size as usize / bytes_per_channel) as usize;
let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len); let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len);
let callback = match callbacks.get_mut(0) { let callback = match *user_callback {
Some(cb) => cb, UserCallback::Active(ref mut cb) => cb,
None => { UserCallback::Inactive { .. } => {
for sample in data_slice.iter_mut() { for sample in data_slice.iter_mut() {
*sample = $equilibrium; *sample = $equilibrium;
} }
@ -755,7 +779,8 @@ impl EventLoop {
}; };
let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice }); let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice });
let stream_data = StreamData::Output { buffer: unknown_type_buffer }; let stream_data = StreamData::Output { buffer: unknown_type_buffer };
callback(StreamId(stream_id), stream_data); let stream_event = StreamEvent::Data(stream_data);
callback(StreamId(stream_id), stream_event);
}}; }};
} }
@ -777,16 +802,33 @@ impl EventLoop {
Ok(StreamId(stream_id)) Ok(StreamId(stream_id))
} }
fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) {
let mut guard = self.user_callback.lock().unwrap();
match *guard {
UserCallback::Active(ref mut callback) => callback(id, event),
UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)),
}
}
pub fn destroy_stream(&self, stream_id: StreamId) { pub fn destroy_stream(&self, stream_id: StreamId) {
{
let mut streams = self.streams.lock().unwrap(); let mut streams = self.streams.lock().unwrap();
streams[stream_id.0] = None; streams[stream_id.0] = None;
} }
// Emit the `Close` event to the user.
let event = StreamEvent::Close(StreamCloseCause::UserDestroyed);
self.emit_or_enqueue_event(stream_id, event);
}
pub fn play_stream(&self, stream: StreamId) -> Result<(), PlayStreamError> { pub fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> {
let mut streams = self.streams.lock().unwrap(); let mut streams = self.streams.lock().unwrap();
let stream = streams[stream.0].as_mut().unwrap(); let stream = streams[stream_id.0].as_mut().unwrap();
if !stream.playing { if !stream.playing {
// Emit the `Play` event to the user. This should not block, as the stream should not
// yet be playing if this is being called.
self.emit_or_enqueue_event(stream_id, StreamEvent::Play);
if let Err(e) = stream.audio_unit.start() { if let Err(e) = stream.audio_unit.start() {
let description = format!("{}", std::error::Error::description(&e)); let description = format!("{}", std::error::Error::description(&e));
let err = BackendSpecificError { description }; let err = BackendSpecificError { description };
@ -797,9 +839,9 @@ impl EventLoop {
Ok(()) Ok(())
} }
pub fn pause_stream(&self, stream: StreamId) -> Result<(), PauseStreamError> { pub fn pause_stream(&self, stream_id: StreamId) -> Result<(), PauseStreamError> {
let mut streams = self.streams.lock().unwrap(); let mut streams = self.streams.lock().unwrap();
let stream = streams[stream.0].as_mut().unwrap(); let stream = streams[stream_id.0].as_mut().unwrap();
if stream.playing { if stream.playing {
if let Err(e) = stream.audio_unit.stop() { if let Err(e) = stream.audio_unit.stop() {
@ -807,6 +849,10 @@ impl EventLoop {
let err = BackendSpecificError { description }; let err = BackendSpecificError { description };
return Err(err.into()); return Err(err.into());
} }
// Emit the `Pause` event to the user.
self.emit_or_enqueue_event(stream_id, StreamEvent::Pause);
stream.playing = false; stream.playing = false;
} }
Ok(()) Ok(())