Add handling of `Play`, `Pause` and `Close` stream events to emscripten

This commit is contained in:
mitchmindtree 2019-06-22 17:47:31 +02:00
parent b2c1226b47
commit b1539c534f
1 changed files with 67 additions and 9 deletions

View File

@ -1,7 +1,7 @@
use std::mem; use std::mem;
use std::os::raw::c_void; use std::os::raw::c_void;
use std::slice::from_raw_parts; use std::slice::from_raw_parts;
use std::sync::Mutex; use std::sync::{Arc, Mutex};
use stdweb; use stdweb;
use stdweb::Reference; use stdweb::Reference;
use stdweb::unstable::TryInto; use stdweb::unstable::TryInto;
@ -16,6 +16,7 @@ use Format;
use PauseStreamError; use PauseStreamError;
use PlayStreamError; use PlayStreamError;
use SupportedFormatsError; use SupportedFormatsError;
use StreamCloseCause;
use StreamData; use StreamData;
use StreamEvent; use StreamEvent;
use SupportedFormat; use SupportedFormat;
@ -32,20 +33,62 @@ use UnknownTypeOutputBuffer;
pub struct EventLoop { pub struct EventLoop {
streams: Mutex<Vec<Option<Reference>>>, streams: Mutex<Vec<Option<Reference>>>,
// The `EventLoop` requires a handle to the callbacks in order to be able to emit necessary
// events for `Play`, `Pause` and `Close`.
user_callback: Arc<Mutex<UserCallback>>
}
enum UserCallback {
// When `run` is called with a callback, that callback will be stored here.
//
// It is essential for the safety of the program that this callback is removed before `run`
// returns (not possible with the current CPAL API).
Active(&'static mut (dyn FnMut(StreamId, StreamEvent) + Send)),
// A queue of events that have occurred but that have not yet been emitted to the user as we
// don't yet have a callback to do so.
Inactive {
pending_events: Vec<(StreamId, StreamEvent<'static>)>
},
} }
impl EventLoop { impl EventLoop {
#[inline] #[inline]
pub fn new() -> EventLoop { pub fn new() -> EventLoop {
stdweb::initialize(); stdweb::initialize();
EventLoop {
EventLoop { streams: Mutex::new(Vec::new()) } streams: Mutex::new(Vec::new()),
user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })),
}
} }
#[inline] #[inline]
pub fn run<F>(&self, callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamEvent) where F: FnMut(StreamId, StreamEvent) + Send,
{ {
// Retrieve and process any pending events.
//
// Then, set the callback ready to be shared between audio processing and the event loop
// handle.
{
let mut guard = self.user_callback.lock().unwrap();
let pending_events = match *guard {
UserCallback::Inactive { ref mut pending_events } => {
mem::replace(pending_events, vec![])
}
UserCallback::Active(_) => {
panic!("`EventLoop::run` was called when the event loop was already running");
}
};
let callback: &mut (dyn FnMut(StreamId, StreamEvent) + Send) = &mut callback;
for (stream_id, event) in pending_events {
callback(stream_id, event);
}
*guard = UserCallback::Active(unsafe { mem::transmute(callback) });
}
// The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job
// of this callback is to fill the content of the audio buffers. // of this callback is to fill the content of the audio buffers.
@ -79,10 +122,10 @@ impl EventLoop {
let typed_array = { let typed_array = {
let f32_slice = temporary_buffer.as_slice(); let f32_slice = temporary_buffer.as_slice();
let u8_slice: &[u8] = unsafe { let u8_slice: &[u8] = from_raw_parts(
from_raw_parts(f32_slice.as_ptr() as *const _, f32_slice.as_ptr() as *const _,
f32_slice.len() * mem::size_of::<f32>()) f32_slice.len() * mem::size_of::<f32>(),
}; );
let typed_array: TypedArray<u8> = u8_slice.into(); let typed_array: TypedArray<u8> = u8_slice.into();
typed_array typed_array
}; };
@ -121,6 +164,9 @@ impl EventLoop {
set_timeout(|| callback_fn::<F>(user_data_ptr as *mut _), 10); set_timeout(|| callback_fn::<F>(user_data_ptr as *mut _), 10);
stdweb::event_loop(); stdweb::event_loop();
// It is critical that we remove the callback before returning (currently not possible).
// *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] };
} }
#[inline] #[inline]
@ -145,9 +191,19 @@ impl EventLoop {
Ok(StreamId(stream_id)) Ok(StreamId(stream_id))
} }
fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) {
let mut guard = self.user_callback.lock().unwrap();
match *guard {
UserCallback::Active(ref mut callback) => callback(id, event),
UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)),
}
}
#[inline] #[inline]
pub fn destroy_stream(&self, stream_id: StreamId) { pub fn destroy_stream(&self, stream_id: StreamId) {
self.streams.lock().unwrap()[stream_id.0] = None; self.streams.lock().unwrap()[stream_id.0] = None;
let event = StreamEvent::Close(StreamCloseCause::UserDestroyed);
self.emit_or_enqueue_event(stream_id, event);
} }
#[inline] #[inline]
@ -157,6 +213,7 @@ impl EventLoop {
.get(stream_id.0) .get(stream_id.0)
.and_then(|v| v.as_ref()) .and_then(|v| v.as_ref())
.expect("invalid stream ID"); .expect("invalid stream ID");
self.emit_or_enqueue_event(stream_id, StreamEvent::Play);
js!(@{stream}.resume()); js!(@{stream}.resume());
Ok(()) Ok(())
} }
@ -169,6 +226,7 @@ impl EventLoop {
.and_then(|v| v.as_ref()) .and_then(|v| v.as_ref())
.expect("invalid stream ID"); .expect("invalid stream ID");
js!(@{stream}.suspend()); js!(@{stream}.suspend());
self.emit_or_enqueue_event(stream_id, StreamEvent::Pause);
Ok(()) Ok(())
} }
} }