From 26f7e99e9bf9da3001dd2846b69f8e52188ca1a7 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sun, 23 Jun 2019 19:04:24 +0200 Subject: [PATCH] Remove all `Pause`, `Play` and `Close` events A follow up to [this comment](https://github.com/tomaka/cpal/pull/288#issuecomment-504712574). --- src/alsa/mod.rs | 12 ++------- src/coreaudio/mod.rs | 47 ++++++--------------------------- src/emscripten/mod.rs | 61 ++----------------------------------------- src/lib.rs | 6 ----- src/wasapi/stream.rs | 7 ----- 5 files changed, 12 insertions(+), 121 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 6c4a276..b11b590 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -14,7 +14,6 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; -use StreamCloseCause; use StreamData; use StreamError; use StreamEvent; @@ -473,7 +472,7 @@ impl EventLoop { let run_context = &mut *run_context; 'stream_loop: loop { - process_commands(run_context, callback); + process_commands(run_context); reset_descriptors_with_pending_command_trigger( &mut run_context.descriptors, @@ -827,22 +826,16 @@ impl EventLoop { } // Process any pending `Command`s within the `RunContext`'s queue. -fn process_commands( - run_context: &mut RunContext, - callback: &mut dyn FnMut(StreamId, StreamEvent), -) { +fn process_commands(run_context: &mut RunContext) { for command in run_context.commands.try_iter() { match command { Command::DestroyStream(stream_id) => { run_context.streams.retain(|s| s.id != stream_id); - let event = StreamCloseCause::UserDestroyed.into(); - callback(stream_id, event); }, Command::PlayStream(stream_id) => { if let Some(stream) = run_context.streams.iter_mut() .find(|stream| stream.can_pause && stream.id == stream_id) { - callback(stream_id, StreamEvent::Play); unsafe { alsa::snd_pcm_pause(stream.channel, 0); } @@ -853,7 +846,6 @@ fn process_commands( if let Some(stream) = run_context.streams.iter_mut() .find(|stream| stream.can_pause && stream.id == stream_id) { - callback(stream_id, StreamEvent::Pause); unsafe { alsa::snd_pcm_pause(stream.channel, 1); } diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index 6ac386f..101dbcc 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -12,7 +12,6 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; -use StreamCloseCause; use StreamData; use StreamEvent; use SupportedFormat; @@ -336,9 +335,7 @@ enum UserCallback { Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)), // A queue of events that have occurred but that have not yet been emitted to the user as we // don't yet have a callback to do so. - Inactive { - pending_events: Vec<(StreamId, StreamEvent<'static>)> - }, + Inactive, } struct StreamInner { @@ -440,7 +437,7 @@ impl EventLoop { #[inline] pub fn new() -> EventLoop { EventLoop { - user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), + user_callback: Arc::new(Mutex::new(UserCallback::Inactive)), streams: Mutex::new(Vec::new()), } } @@ -451,20 +448,10 @@ impl EventLoop { { { let mut guard = self.user_callback.lock().unwrap(); - let pending_events = match *guard { - UserCallback::Inactive { ref mut pending_events } => { - mem::replace(pending_events, vec![]) - } - UserCallback::Active(_) => { - panic!("`EventLoop::run` was called when the event loop was already running"); - } - }; - - let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; - for (stream_id, event) in pending_events { - callback(stream_id, event); + if let UserCallback::Active(_) = *guard { + panic!("`EventLoop::run` was called when the event loop was already running"); } - + let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); } @@ -474,7 +461,7 @@ impl EventLoop { } // It is critical that we remove the callback before returning (currently not possible). - // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; + // *self.user_callback.lock().unwrap() = UserCallback::Inactive; } fn next_stream_id(&self) -> usize { @@ -698,7 +685,7 @@ impl EventLoop { let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len); let callback = match *user_callback { UserCallback::Active(ref mut cb) => cb, - UserCallback::Inactive { .. } => return Ok(()), + UserCallback::Inactive => return Ok(()), }; let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let stream_data = StreamData::Input { buffer: unknown_type_buffer }; @@ -770,7 +757,7 @@ impl EventLoop { let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len); let callback = match *user_callback { UserCallback::Active(ref mut cb) => cb, - UserCallback::Inactive { .. } => { + UserCallback::Inactive => { for sample in data_slice.iter_mut() { *sample = $equilibrium; } @@ -802,22 +789,11 @@ impl EventLoop { Ok(StreamId(stream_id)) } - fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { - let mut guard = self.user_callback.lock().unwrap(); - match *guard { - UserCallback::Active(ref mut callback) => callback(id, event), - UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), - } - } - pub fn destroy_stream(&self, stream_id: StreamId) { { let mut streams = self.streams.lock().unwrap(); streams[stream_id.0] = None; } - // Emit the `Close` event to the user. - let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); - self.emit_or_enqueue_event(stream_id, event); } pub fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> { @@ -825,10 +801,6 @@ impl EventLoop { let stream = streams[stream_id.0].as_mut().unwrap(); if !stream.playing { - // Emit the `Play` event to the user. This should not block, as the stream should not - // yet be playing if this is being called. - self.emit_or_enqueue_event(stream_id, StreamEvent::Play); - if let Err(e) = stream.audio_unit.start() { let description = format!("{}", std::error::Error::description(&e)); let err = BackendSpecificError { description }; @@ -850,9 +822,6 @@ impl EventLoop { return Err(err.into()); } - // Emit the `Pause` event to the user. - self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); - stream.playing = false; } Ok(()) diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index ad82af4..687f648 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -1,7 +1,7 @@ use std::mem; use std::os::raw::c_void; use std::slice::from_raw_parts; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use stdweb; use stdweb::Reference; use stdweb::unstable::TryInto; @@ -16,7 +16,6 @@ use Format; use PauseStreamError; use PlayStreamError; use SupportedFormatsError; -use StreamCloseCause; use StreamData; use StreamEvent; use SupportedFormat; @@ -33,22 +32,6 @@ use UnknownTypeOutputBuffer; pub struct EventLoop { streams: Mutex>>, - // The `EventLoop` requires a handle to the callbacks in order to be able to emit necessary - // events for `Play`, `Pause` and `Close`. - user_callback: Arc> -} - -enum UserCallback { - // When `run` is called with a callback, that callback will be stored here. - // - // It is essential for the safety of the program that this callback is removed before `run` - // returns (not possible with the current CPAL API). - Active(&'static mut (dyn FnMut(StreamId, StreamEvent) + Send)), - // A queue of events that have occurred but that have not yet been emitted to the user as we - // don't yet have a callback to do so. - Inactive { - pending_events: Vec<(StreamId, StreamEvent<'static>)> - }, } impl EventLoop { @@ -57,38 +40,13 @@ impl EventLoop { stdweb::initialize(); EventLoop { streams: Mutex::new(Vec::new()), - user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), } } #[inline] - pub fn run(&self, mut callback: F) -> ! + pub fn run(&self, callback: F) -> ! where F: FnMut(StreamId, StreamEvent) + Send, { - // Retrieve and process any pending events. - // - // Then, set the callback ready to be shared between audio processing and the event loop - // handle. - { - let mut guard = self.user_callback.lock().unwrap(); - let pending_events = match *guard { - UserCallback::Inactive { ref mut pending_events } => { - mem::replace(pending_events, vec![]) - } - UserCallback::Active(_) => { - panic!("`EventLoop::run` was called when the event loop was already running"); - } - }; - - let callback: &mut (dyn FnMut(StreamId, StreamEvent) + Send) = &mut callback; - for (stream_id, event) in pending_events { - callback(stream_id, event); - } - - *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); - } - - // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // of this callback is to fill the content of the audio buffers. @@ -164,9 +122,6 @@ impl EventLoop { set_timeout(|| callback_fn::(user_data_ptr as *mut _), 10); stdweb::event_loop(); - - // It is critical that we remove the callback before returning (currently not possible). - // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; } #[inline] @@ -191,19 +146,9 @@ impl EventLoop { Ok(StreamId(stream_id)) } - fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { - let mut guard = self.user_callback.lock().unwrap(); - match *guard { - UserCallback::Active(ref mut callback) => callback(id, event), - UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), - } - } - #[inline] pub fn destroy_stream(&self, stream_id: StreamId) { self.streams.lock().unwrap()[stream_id.0] = None; - let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); - self.emit_or_enqueue_event(stream_id, event); } #[inline] @@ -213,7 +158,6 @@ impl EventLoop { .get(stream_id.0) .and_then(|v| v.as_ref()) .expect("invalid stream ID"); - self.emit_or_enqueue_event(stream_id, StreamEvent::Play); js!(@{stream}.resume()); Ok(()) } @@ -226,7 +170,6 @@ impl EventLoop { .and_then(|v| v.as_ref()) .expect("invalid stream ID"); js!(@{stream}.suspend()); - self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 48b90a2..78a21d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -219,12 +219,6 @@ pub enum StreamData<'a> { pub enum StreamEvent<'a> { /// Some data is ready to be processed. Data(StreamData<'a>), - /// The stream has received a **Play** command. - Play, - /// The stream has received a **Pause** command. - /// - /// No **Data** events should occur until a subsequent **Play** command is received. - Pause, /// The stream was closed, either because the user destroyed it or because of an error. /// /// The stream event callback will not be called again after this event occurs. diff --git a/src/wasapi/stream.rs b/src/wasapi/stream.rs index 9539ac9..2ba2ac6 100644 --- a/src/wasapi/stream.rs +++ b/src/wasapi/stream.rs @@ -26,7 +26,6 @@ use Format; use PauseStreamError; use PlayStreamError; use SampleFormat; -use StreamCloseCause; use StreamData; use StreamError; use StreamEvent; @@ -758,8 +757,6 @@ fn process_commands( run_context.streams.remove(p); }, } - let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); - callback(stream_id, event); }, Command::PlayStream(stream_id) => { match run_context.streams.iter().position(|s| s.id == stream_id) { @@ -772,8 +769,6 @@ fn process_commands( match stream_error_from_hresult(hresult) { Ok(()) => { run_context.streams[p].playing = true; - let event = StreamEvent::Play; - callback(stream_id, event); } Err(err) => { let event = StreamEvent::Close(err.into()); @@ -797,8 +792,6 @@ fn process_commands( match stream_error_from_hresult(hresult) { Ok(()) => { run_context.streams[p].playing = false; - let event = StreamEvent::Pause; - callback(stream_id, event); } Err(err) => { let event = StreamEvent::Close(err.into());