From b1539c534f604930ff4a4aa1225a95e065d720bd Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 17:47:31 +0200 Subject: [PATCH] Add handling of `Play`, `Pause` and `Close` stream events to emscripten --- src/emscripten/mod.rs | 76 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 9 deletions(-) diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index cb53729..ad82af4 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -1,7 +1,7 @@ use std::mem; use std::os::raw::c_void; use std::slice::from_raw_parts; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use stdweb; use stdweb::Reference; use stdweb::unstable::TryInto; @@ -16,6 +16,7 @@ use Format; use PauseStreamError; use PlayStreamError; use SupportedFormatsError; +use StreamCloseCause; use StreamData; use StreamEvent; use SupportedFormat; @@ -32,20 +33,62 @@ use UnknownTypeOutputBuffer; pub struct EventLoop { streams: Mutex>>, + // The `EventLoop` requires a handle to the callbacks in order to be able to emit necessary + // events for `Play`, `Pause` and `Close`. + user_callback: Arc> +} + +enum UserCallback { + // When `run` is called with a callback, that callback will be stored here. + // + // It is essential for the safety of the program that this callback is removed before `run` + // returns (not possible with the current CPAL API). + Active(&'static mut (dyn FnMut(StreamId, StreamEvent) + Send)), + // A queue of events that have occurred but that have not yet been emitted to the user as we + // don't yet have a callback to do so. + Inactive { + pending_events: Vec<(StreamId, StreamEvent<'static>)> + }, } impl EventLoop { #[inline] pub fn new() -> EventLoop { stdweb::initialize(); - - EventLoop { streams: Mutex::new(Vec::new()) } + EventLoop { + streams: Mutex::new(Vec::new()), + user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), + } } #[inline] - pub fn run(&self, callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + pub fn run(&self, mut callback: F) -> ! + where F: FnMut(StreamId, StreamEvent) + Send, { + // Retrieve and process any pending events. + // + // Then, set the callback ready to be shared between audio processing and the event loop + // handle. + { + let mut guard = self.user_callback.lock().unwrap(); + let pending_events = match *guard { + UserCallback::Inactive { ref mut pending_events } => { + mem::replace(pending_events, vec![]) + } + UserCallback::Active(_) => { + panic!("`EventLoop::run` was called when the event loop was already running"); + } + }; + + let callback: &mut (dyn FnMut(StreamId, StreamEvent) + Send) = &mut callback; + for (stream_id, event) in pending_events { + callback(stream_id, event); + } + + *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); + } + + // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // of this callback is to fill the content of the audio buffers. @@ -79,10 +122,10 @@ impl EventLoop { let typed_array = { let f32_slice = temporary_buffer.as_slice(); - let u8_slice: &[u8] = unsafe { - from_raw_parts(f32_slice.as_ptr() as *const _, - f32_slice.len() * mem::size_of::()) - }; + let u8_slice: &[u8] = from_raw_parts( + f32_slice.as_ptr() as *const _, + f32_slice.len() * mem::size_of::(), + ); let typed_array: TypedArray = u8_slice.into(); typed_array }; @@ -121,6 +164,9 @@ impl EventLoop { set_timeout(|| callback_fn::(user_data_ptr as *mut _), 10); stdweb::event_loop(); + + // It is critical that we remove the callback before returning (currently not possible). + // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; } #[inline] @@ -145,9 +191,19 @@ impl EventLoop { Ok(StreamId(stream_id)) } + fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { + let mut guard = self.user_callback.lock().unwrap(); + match *guard { + UserCallback::Active(ref mut callback) => callback(id, event), + UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), + } + } + #[inline] pub fn destroy_stream(&self, stream_id: StreamId) { self.streams.lock().unwrap()[stream_id.0] = None; + let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); + self.emit_or_enqueue_event(stream_id, event); } #[inline] @@ -157,6 +213,7 @@ impl EventLoop { .get(stream_id.0) .and_then(|v| v.as_ref()) .expect("invalid stream ID"); + self.emit_or_enqueue_event(stream_id, StreamEvent::Play); js!(@{stream}.resume()); Ok(()) } @@ -169,6 +226,7 @@ impl EventLoop { .and_then(|v| v.as_ref()) .expect("invalid stream ID"); js!(@{stream}.suspend()); + self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); Ok(()) } }