From 59c789fbcd6814deb4636881bab43b9c89f4023c Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 00:02:57 +0200 Subject: [PATCH 01/10] Add new `StreamEvent` type - enables more flexible user callback API This adds the following types: - `StreamEvent` - `CloseStreamCause` - `StreamError` These allow for notifying the user of the following events: - A stream has been played. - A stream has been paused. - A stream has been closed due to user destroying stream. - A stream has been closed due to an error. --- src/lib.rs | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3b6c5c8..48b90a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,8 +70,8 @@ //! //! ```no_run //! # let event_loop = cpal::EventLoop::new(); -//! event_loop.run(move |_stream_id, _stream_data| { -//! // read or write stream data here +//! event_loop.run(move |_stream_id, _stream_event| { +//! // react to stream events and read or write stream data here //! }); //! ``` //! @@ -90,7 +90,16 @@ //! use cpal::{StreamData, UnknownTypeOutputBuffer}; //! //! # let event_loop = cpal::EventLoop::new(); -//! event_loop.run(move |_stream_id, mut stream_data| { +//! event_loop.run(move |stream_id, stream_event| { +//! let stream_data = match stream_event { +//! cpal::StreamEvent::Data(data) => data, +//! cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { +//! eprintln!("stream {:?} closed due to an error: {}", stream_id, err); +//! return; +//! } +//! _ => return, +//! }; +//! //! match stream_data { //! StreamData::Output { buffer: UnknownTypeOutputBuffer::U16(mut buffer) } => { //! for elem in buffer.iter_mut() { @@ -206,6 +215,31 @@ pub enum StreamData<'a> { }, } +/// Events that may be emitted to the user via the callback submitted to `EventLoop::run`. +pub enum StreamEvent<'a> { + /// Some data is ready to be processed. + Data(StreamData<'a>), + /// The stream has received a **Play** command. + Play, + /// The stream has received a **Pause** command. + /// + /// No **Data** events should occur until a subsequent **Play** command is received. + Pause, + /// The stream was closed, either because the user destroyed it or because of an error. + /// + /// The stream event callback will not be called again after this event occurs. + Close(StreamCloseCause), +} + +/// The cause behind why a stream was closed. +#[derive(Debug)] +pub enum StreamCloseCause { + /// The stream was closed as the user called `destroy_stream`. + UserDestroyed, + /// The stream was closed due to an error occurring. + Error(StreamError), +} + /// Represents a buffer containing audio data that may be read. /// /// This struct implements the `Deref` trait targeting `[T]`. Therefore this buffer can be read the @@ -291,7 +325,7 @@ pub struct SupportedOutputFormats(cpal_impl::SupportedOutputFormats); /// **Note:** If you notice a `BackendSpecificError` that you believe could be better handled in a /// cross-platform manner, please create an issue or submit a pull request with a patch that adds /// the necessary error variant to the appropriate error enum. -#[derive(Debug, Fail)] +#[derive(Clone, Debug, Fail)] #[fail(display = "A backend-specific error has occurred: {}", description)] pub struct BackendSpecificError { pub description: String @@ -412,6 +446,24 @@ pub enum PauseStreamError { } } +/// Errors that might occur while a stream is running. +/// +/// These errors are delivered to the user callback via +/// `StreamEvent::Close(StreamCloseCause::Error(_))` +#[derive(Debug, Fail)] +pub enum StreamError { + /// The device no longer exists. This can happen if the device is disconnected while the + /// program is running. + #[fail(display = "The requested device is no longer available. For example, it has been unplugged.")] + DeviceNotAvailable, + /// See the `BackendSpecificError` docs for more information about this error variant. + #[fail(display = "{}", err)] + BackendSpecific { + #[fail(cause)] + err: BackendSpecificError, + } +} + /// An iterator yielding all `Device`s currently available to the system. /// /// Can be empty if the system does not support audio in general. @@ -591,7 +643,7 @@ impl EventLoop { /// You can call the other methods of `EventLoop` without getting a deadlock. #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamData) + Send + where F: FnMut(StreamId, StreamEvent) + Send { self.0.run(move |id, data| callback(StreamId(id), data)) } @@ -789,6 +841,18 @@ impl Iterator for SupportedOutputFormats { } } +impl From for StreamCloseCause { + fn from(err: StreamError) -> Self { + StreamCloseCause::Error(err) + } +} + +impl<'a> From for StreamEvent<'a> { + fn from(cause: StreamCloseCause) -> Self { + StreamEvent::Close(cause) + } +} + impl From for DevicesError { fn from(err: BackendSpecificError) -> Self { DevicesError::BackendSpecific { err } @@ -831,6 +895,18 @@ impl From for PauseStreamError { } } +impl From for StreamError { + fn from(err: BackendSpecificError) -> Self { + StreamError::BackendSpecific { err } + } +} + +impl From for StreamCloseCause { + fn from(err: BackendSpecificError) -> Self { + StreamCloseCause::Error(err.into()) + } +} + // If a backend does not provide an API for retrieving supported formats, we query it with a bunch // of commonly used rates. This is always the case for wasapi and is sometimes the case for alsa. // From e41baa248b09cbe92407561edf0bc65ced40b28a Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 00:06:55 +0200 Subject: [PATCH 02/10] Update alsa backend for addition of `StreamEvent` type This commit significantly refactors the alsa backend's `EventLoop::run` implementation in order to allow for better error handling throughout the loop. This removes many cases that would previously `panic!` in favour of calling the user callback with the necessary error and removing the corrupt stream. Seeing as the method cannot return, a catch-all `panic!` still exists at the end of the method, however this refactor should make it much easier to remove this restriction in the future. --- src/alsa/mod.rs | 426 +++++++++++++++++++++++++++++++----------------- 1 file changed, 276 insertions(+), 150 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 43a3191..6c4a276 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -14,7 +14,10 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; +use StreamCloseCause; use StreamData; +use StreamError; +use StreamEvent; use SupportedFormat; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; @@ -353,7 +356,7 @@ pub struct EventLoop { // A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so // that `poll()` can wake up and pick the changes. - pending_trigger: Trigger, + pending_command_trigger: Trigger, // This field is locked by the `run()` method. // The mutex also ensures that only one thread at a time has `run()` running. @@ -377,7 +380,7 @@ enum Command { } struct RunContext { - // Descriptors to wait for. Always contains `pending_trigger.read_fd()` as first element. + // Descriptors to wait for. Always contains `pending_command_trigger.read_fd()` as first element. descriptors: Vec, // List of streams that are written in `descriptors`. streams: Vec, @@ -421,24 +424,25 @@ struct StreamInner { // Lazily allocated buffer that is reused inside the loop. // Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is // used. - buffer: Option>, + buffer: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamId(usize); +enum StreamType { Input, Output } + + impl EventLoop { #[inline] pub fn new() -> EventLoop { - let pending_trigger = Trigger::new(); + let pending_command_trigger = Trigger::new(); - let initial_descriptors = vec![ - libc::pollfd { - fd: pending_trigger.read_fd(), - events: libc::POLLIN, - revents: 0, - }, - ]; + let mut initial_descriptors = vec![]; + reset_descriptors_with_pending_command_trigger( + &mut initial_descriptors, + &pending_command_trigger, + ); let (tx, rx) = channel(); @@ -450,7 +454,7 @@ impl EventLoop { EventLoop { next_stream_id: AtomicUsize::new(0), - pending_trigger: pending_trigger, + pending_command_trigger: pending_command_trigger, run_context, commands: tx, } @@ -458,220 +462,191 @@ impl EventLoop { #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamData) + where F: FnMut(StreamId, StreamEvent) { self.run_inner(&mut callback) } - fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! { + fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { unsafe { let mut run_context = self.run_context.lock().unwrap(); let run_context = &mut *run_context; - loop { - { - for command in run_context.commands.try_iter() { - match command { - Command::DestroyStream(stream_id) => { - run_context.streams.retain(|s| s.id != stream_id); - }, - Command::PlayStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - alsa::snd_pcm_pause(stream.channel, 0); - stream.is_paused = false; - } - }, - Command::PauseStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - alsa::snd_pcm_pause(stream.channel, 1); - stream.is_paused = true; - } - }, - Command::NewStream(stream_inner) => { - run_context.streams.push(stream_inner); - }, + 'stream_loop: loop { + process_commands(run_context, callback); + + reset_descriptors_with_pending_command_trigger( + &mut run_context.descriptors, + &self.pending_command_trigger, + ); + append_stream_poll_descriptors(run_context); + + // At this point, this should include the command `pending_commands_trigger` along + // with the poll descriptors for each stream. + match poll_all_descriptors(&mut run_context.descriptors) { + Ok(true) => (), + Ok(false) => continue, + Err(err) => { + for stream in run_context.streams.iter() { + let event = StreamEvent::Close(err.clone().into()); + callback(stream.id, event); } - } - - run_context.descriptors = vec![ - libc::pollfd { - fd: self.pending_trigger.read_fd(), - events: libc::POLLIN, - revents: 0, - }, - ]; - for stream in run_context.streams.iter() { - run_context.descriptors.reserve(stream.num_descriptors); - let len = run_context.descriptors.len(); - let filled = alsa::snd_pcm_poll_descriptors(stream.channel, - run_context - .descriptors - .as_mut_ptr() - .offset(len as isize), - stream.num_descriptors as - libc::c_uint); - debug_assert_eq!(filled, stream.num_descriptors as libc::c_int); - run_context.descriptors.set_len(len + stream.num_descriptors); + run_context.streams.clear(); + break 'stream_loop; } } - let ret = libc::poll(run_context.descriptors.as_mut_ptr(), - run_context.descriptors.len() as libc::nfds_t, - -1 /* infinite */); - assert!(ret >= 0, "poll() failed"); - - if ret == 0 { - continue; - } - - // If the `pending_trigger` was signaled, we need to process the comands. + // If the `pending_command_trigger` was signaled, we need to process the comands. if run_context.descriptors[0].revents != 0 { run_context.descriptors[0].revents = 0; - self.pending_trigger.clear_pipe(); + self.pending_command_trigger.clear_pipe(); } + // The set of streams that error within the following loop and should be removed. + let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; + // Iterate over each individual stream/descriptor. let mut i_stream = 0; let mut i_descriptor = 1; while (i_descriptor as usize) < run_context.descriptors.len() { - enum StreamType { Input, Output } - let stream_type; - let stream_inner = run_context.streams.get_mut(i_stream).unwrap(); + let stream = &mut run_context.streams[i_stream]; + let stream_descriptor_ptr = run_context.descriptors.as_mut_ptr().offset(i_descriptor); - // Check whether the event is `POLLOUT` or `POLLIN`. If neither, `continue`. - { - let mut revent = mem::uninitialized(); - - { - let num_descriptors = stream_inner.num_descriptors as libc::c_uint; - let desc_ptr = - run_context.descriptors.as_mut_ptr().offset(i_descriptor); - let res = alsa::snd_pcm_poll_descriptors_revents(stream_inner.channel, - desc_ptr, - num_descriptors, - &mut revent); - check_errors(res).unwrap(); - } - - if revent as i16 == libc::POLLOUT { - stream_type = StreamType::Output; - } else if revent as i16 == libc::POLLIN { - stream_type = StreamType::Input; - } else { - i_descriptor += stream_inner.num_descriptors as isize; + // Only go on if this event was a pollout or pollin event. + let stream_type = match check_for_pollout_or_pollin(stream, stream_descriptor_ptr) { + Ok(Some(ty)) => ty, + Ok(None) => { + i_descriptor += stream.num_descriptors as isize; + i_stream += 1; + continue; + }, + Err(err) => { + streams_to_remove.push((stream.id, err.into())); + i_descriptor += stream.num_descriptors as isize; i_stream += 1; continue; } - } + }; - // Determine the number of samples that are available to read/write. - let available_samples = { - let available = alsa::snd_pcm_avail(stream_inner.channel); // TODO: what about snd_pcm_avail_update? - - if available == -32 { - // buffer underrun - stream_inner.buffer_len - } else if available < 0 { - check_errors(available as libc::c_int) - .expect("buffer is not available"); - unreachable!() - } else { - (available * stream_inner.num_channels as alsa::snd_pcm_sframes_t) as - usize + // Get the number of available samples for reading/writing. + let available_samples = match get_available_samples(stream) { + Ok(n) => n, + Err(err) => { + streams_to_remove.push((stream.id, err.into())); + i_descriptor += stream.num_descriptors as isize; + i_stream += 1; + continue; } }; - if available_samples < stream_inner.period_len { - i_descriptor += stream_inner.num_descriptors as isize; + // Only go on if there is at least `stream.period_len` samples. + if available_samples < stream.period_len { + i_descriptor += stream.num_descriptors as isize; i_stream += 1; continue; } - let stream_id = stream_inner.id.clone(); - - let available_frames = available_samples / stream_inner.num_channels as usize; - - let buffer_size = stream_inner.sample_format.sample_size() * available_samples; - // Could be written with a match with improved borrow checking - if stream_inner.buffer.is_none() { - stream_inner.buffer = Some(vec![0u8; buffer_size]); - } else { - stream_inner.buffer.as_mut().unwrap().resize(buffer_size, 0u8); - } - let buffer = stream_inner.buffer.as_mut().unwrap(); + // Prepare the data buffer. + let buffer_size = stream.sample_format.sample_size() * available_samples; + stream.buffer.resize(buffer_size, 0u8); + let available_frames = available_samples / stream.num_channels as usize; match stream_type { StreamType::Input => { - let err = alsa::snd_pcm_readi( - stream_inner.channel, - buffer.as_mut_ptr() as *mut _, + let result = alsa::snd_pcm_readi( + stream.channel, + stream.buffer.as_mut_ptr() as *mut _, available_frames as alsa::snd_pcm_uframes_t, ); - check_errors(err as _).expect("snd_pcm_readi error"); + if let Err(err) = check_errors(result as _) { + let description = format!("`snd_pcm_readi` failed: {}", err); + let err = BackendSpecificError { description }; + streams_to_remove.push((stream.id, err.into())); + continue; + } - let input_buffer = match stream_inner.sample_format { + let input_buffer = match stream.sample_format { SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer { - buffer: cast_input_buffer(buffer), + buffer: cast_input_buffer(&mut stream.buffer), }), SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer { - buffer: cast_input_buffer(buffer), + buffer: cast_input_buffer(&mut stream.buffer), }), SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer { - buffer: cast_input_buffer(buffer), + buffer: cast_input_buffer(&mut stream.buffer), }), }; let stream_data = StreamData::Input { buffer: input_buffer, }; - callback(stream_id, stream_data); + let event = StreamEvent::Data(stream_data); + callback(stream.id, event); }, StreamType::Output => { { // We're now sure that we're ready to write data. - let output_buffer = match stream_inner.sample_format { + let output_buffer = match stream.sample_format { SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer { - buffer: cast_output_buffer(buffer), + buffer: cast_output_buffer(&mut stream.buffer), }), SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer { - buffer: cast_output_buffer(buffer), + buffer: cast_output_buffer(&mut stream.buffer), }), SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer { - buffer: cast_output_buffer(buffer), + buffer: cast_output_buffer(&mut stream.buffer), }), }; let stream_data = StreamData::Output { buffer: output_buffer, }; - callback(stream_id, stream_data); + let event = StreamEvent::Data(stream_data); + callback(stream.id, event); } loop { let result = alsa::snd_pcm_writei( - stream_inner.channel, - buffer.as_ptr() as *const _, + stream.channel, + stream.buffer.as_ptr() as *const _, available_frames as alsa::snd_pcm_uframes_t, ); if result == -32 { // buffer underrun - alsa::snd_pcm_prepare(stream_inner.channel); - } else if result < 0 { - check_errors(result as libc::c_int) - .expect("could not write pcm"); + // TODO: Notify the user of this. + alsa::snd_pcm_prepare(stream.channel); + } else if let Err(err) = check_errors(result as _) { + let description = format!("`snd_pcm_writei` failed: {}", err); + let err = BackendSpecificError { description }; + streams_to_remove.push((stream.id, err.into())); + continue; + } else if result as usize != available_frames { + let description = format!( + "unexpected number of frames written: expected {}, \ + result {} (this should never happen)", + available_frames, + result, + ); + let err = BackendSpecificError { description }; + streams_to_remove.push((stream.id, err.into())); + continue; } else { - assert_eq!(result as usize, available_frames); break; } } }, } } + + // Remove any streams that have errored and notify the user. + for (stream_id, err) in streams_to_remove { + run_context.streams.retain(|s| s.id != stream_id); + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); + } } } + + panic!("`cpal::EventLoop::run` API currently disallows returning"); } pub fn build_input_stream( @@ -739,7 +714,7 @@ impl EventLoop { can_pause: can_pause, is_paused: false, resume_trigger: Trigger::new(), - buffer: None, + buffer: vec![], }; if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) { @@ -818,7 +793,7 @@ impl EventLoop { can_pause: can_pause, is_paused: false, resume_trigger: Trigger::new(), - buffer: None, + buffer: vec![], }; self.push_command(Command::NewStream(stream_inner)); @@ -830,7 +805,7 @@ impl EventLoop { fn push_command(&self, command: Command) { // Safe to unwrap: sender outlives receiver. self.commands.send(command).unwrap(); - self.pending_trigger.wakeup(); + self.pending_command_trigger.wakeup(); } #[inline] @@ -851,6 +826,157 @@ impl EventLoop { } } +// Process any pending `Command`s within the `RunContext`'s queue. +fn process_commands( + run_context: &mut RunContext, + callback: &mut dyn FnMut(StreamId, StreamEvent), +) { + for command in run_context.commands.try_iter() { + match command { + Command::DestroyStream(stream_id) => { + run_context.streams.retain(|s| s.id != stream_id); + let event = StreamCloseCause::UserDestroyed.into(); + callback(stream_id, event); + }, + Command::PlayStream(stream_id) => { + if let Some(stream) = run_context.streams.iter_mut() + .find(|stream| stream.can_pause && stream.id == stream_id) + { + callback(stream_id, StreamEvent::Play); + unsafe { + alsa::snd_pcm_pause(stream.channel, 0); + } + stream.is_paused = false; + } + }, + Command::PauseStream(stream_id) => { + if let Some(stream) = run_context.streams.iter_mut() + .find(|stream| stream.can_pause && stream.id == stream_id) + { + callback(stream_id, StreamEvent::Pause); + unsafe { + alsa::snd_pcm_pause(stream.channel, 1); + } + stream.is_paused = true; + } + }, + Command::NewStream(stream_inner) => { + run_context.streams.push(stream_inner); + }, + } + } +} + +// Resets the descriptors so that only `pending_command_trigger.read_fd()` is contained. +fn reset_descriptors_with_pending_command_trigger( + descriptors: &mut Vec, + pending_command_trigger: &Trigger, +) { + descriptors.clear(); + descriptors.push(libc::pollfd { + fd: pending_command_trigger.read_fd(), + events: libc::POLLIN, + revents: 0, + }); +} + +// Appends the `poll` descriptors for each stream onto the `RunContext`'s descriptor slice, ready +// for a call to `libc::poll`. +fn append_stream_poll_descriptors(run_context: &mut RunContext) { + for stream in run_context.streams.iter() { + run_context.descriptors.reserve(stream.num_descriptors); + let len = run_context.descriptors.len(); + let filled = unsafe { + alsa::snd_pcm_poll_descriptors( + stream.channel, + run_context.descriptors.as_mut_ptr().offset(len as isize), + stream.num_descriptors as libc::c_uint, + ) + }; + debug_assert_eq!(filled, stream.num_descriptors as libc::c_int); + unsafe { + run_context.descriptors.set_len(len + stream.num_descriptors); + } + } +} + +// Poll all descriptors within the given set. +// +// Returns `Ok(true)` if some event has occurred or `Ok(false)` if no events have +// occurred. +// +// Returns an `Err` if `libc::poll` returns a negative value for some reason. +fn poll_all_descriptors(descriptors: &mut [libc::pollfd]) -> Result { + let res = unsafe { + // Don't timeout, wait forever. + libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1) + }; + if res < 0 { + let description = format!("`libc::poll()` failed: {}", res); + Err(BackendSpecificError { description }) + } else if res == 0 { + Ok(false) + } else { + Ok(true) + } +} + +// Check whether the event is `POLLOUT` or `POLLIN`. +// +// If so, return the stream type associated with the event. +// +// Otherwise, returns `Ok(None)`. +// +// Returns an `Err` if the `snd_pcm_poll_descriptors_revents` call fails. +fn check_for_pollout_or_pollin( + stream: &StreamInner, + stream_descriptor_ptr: *mut libc::pollfd, +) -> Result, BackendSpecificError> { + let (revent, res) = unsafe { + let mut revent = mem::uninitialized(); + let res = alsa::snd_pcm_poll_descriptors_revents( + stream.channel, + stream_descriptor_ptr, + stream.num_descriptors as libc::c_uint, + &mut revent, + ); + (revent, res) + }; + if let Err(desc) = check_errors(res) { + let description = + format!("`snd_pcm_poll_descriptors_revents` failed: {}",desc); + let err = BackendSpecificError { description }; + return Err(err); + } + + if revent as i16 == libc::POLLOUT { + Ok(Some(StreamType::Output)) + } else if revent as i16 == libc::POLLIN { + Ok(Some(StreamType::Input)) + } else { + Ok(None) + } +} + +// Determine the number of samples that are available to read/write. +fn get_available_samples(stream: &StreamInner) -> Result { + // TODO: what about snd_pcm_avail_update? + let available = unsafe { + alsa::snd_pcm_avail(stream.channel) + }; + if available == -32 { + // buffer underrun + // TODO: Notify the user some how. + Ok(stream.buffer_len) + } else if let Err(desc) = check_errors(available as libc::c_int) { + let description = format!("failed to get available samples: {}", desc); + let err = BackendSpecificError { description }; + Err(err) + } else { + Ok((available * stream.num_channels as alsa::snd_pcm_sframes_t) as usize) + } +} + unsafe fn set_hw_params_from_format( pcm_handle: *mut alsa::snd_pcm_t, hw_params: &HwParams, From 39cd5d0084c8c524880c80db92eef01da4c285e0 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 00:10:40 +0200 Subject: [PATCH 03/10] Update examples for addition of new StreamEvent API --- examples/beep.rs | 11 ++++++++++- examples/feedback.rs | 11 ++++++++++- examples/record_wav.rs | 11 ++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/examples/beep.rs b/examples/beep.rs index fe5dfb6..c6b70e0 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -17,7 +17,16 @@ fn main() -> Result<(), failure::Error> { (sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin() }; - event_loop.run(move |_, data| { + event_loop.run(move |id, event| { + let data = match event { + cpal::StreamEvent::Data(data) => data, + cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { + eprintln!("stream {:?} closed due to an error: {}", id, err); + return; + } + _ => return, + }; + match data { cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => { for sample in buffer.chunks_mut(format.channels as usize) { diff --git a/examples/feedback.rs b/examples/feedback.rs index 18dd62a..4c067dd 100644 --- a/examples/feedback.rs +++ b/examples/feedback.rs @@ -49,7 +49,16 @@ fn main() -> Result<(), failure::Error> { // Run the event loop on a separate thread. std::thread::spawn(move || { - event_loop.run(move |id, data| { + event_loop.run(move |id, event| { + let data = match event { + cpal::StreamEvent::Data(data) => data, + cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { + eprintln!("stream {:?} closed due to an error: {}", id, err); + return; + } + _ => return, + }; + match data { cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => { assert_eq!(id, input_stream_id); diff --git a/examples/record_wav.rs b/examples/record_wav.rs index 61489f0..74c10aa 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -30,7 +30,16 @@ fn main() -> Result<(), failure::Error> { let writer_2 = writer.clone(); let recording_2 = recording.clone(); std::thread::spawn(move || { - event_loop.run(move |_, data| { + event_loop.run(move |id, event| { + let data = match event { + cpal::StreamEvent::Data(data) => data, + cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { + eprintln!("stream {:?} closed due to an error: {}", id, err); + return; + } + _ => return, + }; + // If we're done recording, return early. if !recording_2.load(std::sync::atomic::Ordering::Relaxed) { return; From 3e3cf26cdedbbcab07ca19bbedb8feb6c37525b5 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 02:48:47 +0200 Subject: [PATCH 04/10] Update `wasapi` backend for `StreamEvent` API addition. Refactors much of the `EventLoop::run` implementation in order to make error handling a little easier. --- src/wasapi/stream.rs | 436 +++++++++++++++++++++++++++---------------- 1 file changed, 274 insertions(+), 162 deletions(-) diff --git a/src/wasapi/stream.rs b/src/wasapi/stream.rs index b8070a4..9539ac9 100644 --- a/src/wasapi/stream.rs +++ b/src/wasapi/stream.rs @@ -26,7 +26,10 @@ use Format; use PauseStreamError; use PlayStreamError; use SampleFormat; +use StreamCloseCause; use StreamData; +use StreamError; +use StreamEvent; use UnknownTypeOutputBuffer; use UnknownTypeInputBuffer; @@ -443,12 +446,12 @@ impl EventLoop { #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamData) + where F: FnMut(StreamId, StreamEvent) { self.run_inner(&mut callback); } - fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! { + fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { unsafe { // We keep `run_context` locked forever, which guarantees that two invocations of // `run()` cannot run simultaneously. @@ -457,190 +460,165 @@ impl EventLoop { // Shadow the name because we don't use (or drop) it otherwise. let run_context = &mut *run_context; - loop { - // Process the pending commands. - for command in run_context.commands.try_iter() { - match command { - Command::NewStream(stream_inner) => { - let event = stream_inner.event; - run_context.streams.push(stream_inner); - run_context.handles.push(event); - }, - Command::DestroyStream(stream_id) => { - match run_context.streams.iter().position(|v| v.id == stream_id) { - None => continue, - Some(p) => { - run_context.handles.remove(p + 1); - run_context.streams.remove(p); - }, - } - }, - Command::PlayStream(stream_id) => { - match run_context.streams.iter_mut().find(|v| v.id == stream_id) { - None => continue, - Some(stream) => { - if !stream.playing { - let hresult = (*stream.audio_client).Start(); - check_result(hresult).unwrap(); - stream.playing = true; - } - } - } - }, - Command::PauseStream(stream_id) => { - match run_context.streams.iter_mut().find(|v| v.id == stream_id) { - None => continue, - Some(stream) => { - if stream.playing { - let hresult = (*stream.audio_client).Stop(); - check_result(hresult).unwrap(); - stream.playing = false; - } - }, - } + // Keep track of the set of streams that should be removed due to some error occurring. + // + // Checked at the start of each loop. + let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; + + 'stream_loop: loop { + // Remove any failed streams. + for (stream_id, err) in streams_to_remove.drain(..) { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); }, } } - // Wait for any of the handles to be signalled, which means that the corresponding - // sound needs a buffer. - debug_assert!(run_context.handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize); - let result = synchapi::WaitForMultipleObjectsEx(run_context.handles.len() as u32, - run_context.handles.as_ptr(), - FALSE, - winbase::INFINITE, /* TODO: allow setting a timeout */ - FALSE /* irrelevant parameter here */); + // Process queued commands. + process_commands(run_context, callback); - // Notifying the corresponding task handler. - debug_assert!(result >= winbase::WAIT_OBJECT_0); - let handle_id = (result - winbase::WAIT_OBJECT_0) as usize; + // Wait for any of the handles to be signalled. + let handle_idx = match wait_for_handle_signal(&run_context.handles) { + Ok(idx) => idx, + Err(err) => { + for stream in &run_context.streams { + let event = StreamEvent::Close(err.clone().into()); + callback(stream.id.clone(), event); + } + run_context.streams.clear(); + run_context.handles.truncate(1); + break 'stream_loop; + } + }; - // If `handle_id` is 0, then it's `pending_scheduled_event` that was signalled in - // order for us to pick up the pending commands. - // Otherwise, a stream needs data. - if handle_id >= 1 { - let stream = &mut run_context.streams[handle_id - 1]; - let stream_id = stream.id.clone(); + // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in + // order for us to pick up the pending commands. Otherwise, a stream needs data. + if handle_idx == 0 { + continue; + } - // Obtaining the number of frames that are available to be written. - let mut frames_available = { - let mut padding = mem::uninitialized(); - let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding); - // Happens when a bluetooth headset was turned off, for example. - if hresult == AUDCLNT_E_DEVICE_INVALIDATED { - // The client code should switch to a different device eventually. - // For now let's just skip the invalidated device. - // Would be nice to inform the client code about the invalidation, - // but throwing a panic isn't the most ergonomic way to do so. - continue} - check_result(hresult).unwrap(); - stream.max_frames_in_buffer - padding - }; + let stream_idx = handle_idx - 1; + let stream = &mut run_context.streams[stream_idx]; - if frames_available == 0 { - // TODO: can this happen? + // The number of frames available for reading/writing. + let mut frames_available = match get_available_frames(stream) { + Ok(0) => continue, // TODO: Can this happen? + Ok(n) => n, + Err(err) => { + streams_to_remove.push((stream.id.clone(), err)); continue; } + }; - let sample_size = stream.sample_format.sample_size(); + let sample_size = stream.sample_format.sample_size(); - // Obtaining a pointer to the buffer. - match stream.client_flow { + // Obtaining a pointer to the buffer. + match stream.client_flow { - AudioClientFlow::Capture { capture_client } => { - // Get the available data in the shared buffer. - let mut buffer: *mut BYTE = mem::uninitialized(); - let mut flags = mem::uninitialized(); - let hresult = (*capture_client).GetBuffer( - &mut buffer, - &mut frames_available, - &mut flags, - ptr::null_mut(), - ptr::null_mut(), - ); - check_result(hresult).unwrap(); + AudioClientFlow::Capture { capture_client } => { + // Get the available data in the shared buffer. + let mut buffer: *mut BYTE = mem::uninitialized(); + let mut flags = mem::uninitialized(); + let hresult = (*capture_client).GetBuffer( + &mut buffer, + &mut frames_available, + &mut flags, + ptr::null_mut(), + ptr::null_mut(), + ); - if hresult == AUDCLNT_S_BUFFER_EMPTY { continue; } + // TODO: Can this happen? + if hresult == AUDCLNT_S_BUFFER_EMPTY { + continue; + } else if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } - debug_assert!(!buffer.is_null()); - let buffer_len = frames_available as usize - * stream.bytes_per_frame as usize / sample_size; + debug_assert!(!buffer.is_null()); - // Simplify the capture callback sample format branches. - macro_rules! capture_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut _ as *const $T; - let slice = slice::from_raw_parts(buffer_data, buffer_len); - let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { - buffer: slice, - }); - let data = StreamData::Input { buffer: unknown_buffer }; - callback(stream_id, data); + let buffer_len = frames_available as usize + * stream.bytes_per_frame as usize / sample_size; - // Release the buffer. - let hresult = (*capture_client).ReleaseBuffer(frames_available); - match check_result(hresult) { - // Ignoring unavailable device error. - Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => { - }, - e => e.unwrap(), - }; - }}; - } + // Simplify the capture callback sample format branches. + macro_rules! capture_callback { + ($T:ty, $Variant:ident) => {{ + let buffer_data = buffer as *mut _ as *const $T; + let slice = slice::from_raw_parts(buffer_data, buffer_len); + let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { + buffer: slice, + }); + let data = StreamData::Input { buffer: unknown_buffer }; + let event = StreamEvent::Data(data); + callback(stream.id.clone(), event); + // Release the buffer. + let hresult = (*capture_client).ReleaseBuffer(frames_available); + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + }}; + } - match stream.sample_format { - SampleFormat::F32 => capture_callback!(f32, F32), - SampleFormat::I16 => capture_callback!(i16, I16), - SampleFormat::U16 => capture_callback!(u16, U16), - } - }, + match stream.sample_format { + SampleFormat::F32 => capture_callback!(f32, F32), + SampleFormat::I16 => capture_callback!(i16, I16), + SampleFormat::U16 => capture_callback!(u16, U16), + } + }, - AudioClientFlow::Render { render_client } => { - let mut buffer: *mut BYTE = mem::uninitialized(); - let hresult = (*render_client).GetBuffer( - frames_available, - &mut buffer as *mut *mut _, - ); - // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` - check_result(hresult).unwrap(); - debug_assert!(!buffer.is_null()); - let buffer_len = frames_available as usize - * stream.bytes_per_frame as usize / sample_size; + AudioClientFlow::Render { render_client } => { + let mut buffer: *mut BYTE = mem::uninitialized(); + let hresult = (*render_client).GetBuffer( + frames_available, + &mut buffer as *mut *mut _, + ); - // Simplify the render callback sample format branches. - macro_rules! render_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut $T; - let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); - let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer { - buffer: slice - }); - let data = StreamData::Output { buffer: unknown_buffer }; - callback(stream_id, data); - let hresult = match stream.client_flow { - AudioClientFlow::Render { render_client } => { - (*render_client).ReleaseBuffer(frames_available as u32, 0) - }, - _ => unreachable!(), - }; - match check_result(hresult) { - // Ignoring the error that is produced if the device has been disconnected. - Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => (), - e => e.unwrap(), - }; - }} - } + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } - match stream.sample_format { - SampleFormat::F32 => render_callback!(f32, F32), - SampleFormat::I16 => render_callback!(i16, I16), - SampleFormat::U16 => render_callback!(u16, U16), - } - }, - } + debug_assert!(!buffer.is_null()); + let buffer_len = frames_available as usize + * stream.bytes_per_frame as usize / sample_size; + + // Simplify the render callback sample format branches. + macro_rules! render_callback { + ($T:ty, $Variant:ident) => {{ + let buffer_data = buffer as *mut $T; + let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); + let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer { + buffer: slice + }); + let data = StreamData::Output { buffer: unknown_buffer }; + let event = StreamEvent::Data(data); + callback(stream.id.clone(), event); + let hresult = (*render_client) + .ReleaseBuffer(frames_available as u32, 0); + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + }} + } + + match stream.sample_format { + SampleFormat::F32 => render_callback!(f32, F32), + SampleFormat::I16 => render_callback!(i16, I16), + SampleFormat::U16 => render_callback!(u16, U16), + } + }, } } } + + panic!("`cpal::EventLoop::run` API currently disallows returning"); } #[inline] @@ -758,3 +736,137 @@ fn format_to_waveformatextensible(format: &Format) -> Option { + let event = stream_inner.event; + run_context.streams.push(stream_inner); + run_context.handles.push(event); + }, + Command::DestroyStream(stream_id) => { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + }, + } + let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); + callback(stream_id, event); + }, + Command::PlayStream(stream_id) => { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + if !run_context.streams[p].playing { + let hresult = unsafe { + (*run_context.streams[p].audio_client).Start() + }; + match stream_error_from_hresult(hresult) { + Ok(()) => { + run_context.streams[p].playing = true; + let event = StreamEvent::Play; + callback(stream_id, event); + } + Err(err) => { + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + } + } + } + } + } + }, + Command::PauseStream(stream_id) => { + match run_context.streams.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + if run_context.streams[p].playing { + let hresult = unsafe { + (*run_context.streams[p].audio_client).Stop() + }; + match stream_error_from_hresult(hresult) { + Ok(()) => { + run_context.streams[p].playing = false; + let event = StreamEvent::Pause; + callback(stream_id, event); + } + Err(err) => { + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); + run_context.handles.remove(p + 1); + run_context.streams.remove(p); + } + } + } + }, + } + }, + } + } +} + +// Wait for any of the given handles to be signalled. +// +// Returns the index of the `handle` that was signalled, or an `Err` if +// `WaitForMultipleObjectsEx` fails. +// +// This is called when the `run` thread is ready to wait for the next event. The +// next event might be some command submitted by the user (the first handle) or +// might indicate that one of the streams is ready to deliver or receive audio. +fn wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result { + debug_assert!(handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize); + let result = unsafe { + synchapi::WaitForMultipleObjectsEx( + handles.len() as u32, + handles.as_ptr(), + FALSE, // Don't wait for all, just wait for the first + winbase::INFINITE, // TODO: allow setting a timeout + FALSE, // irrelevant parameter here + ) + }; + if result == winbase::WAIT_FAILED { + let err = unsafe { + winapi::um::errhandlingapi::GetLastError() + }; + let description = format!("`WaitForMultipleObjectsEx failed: {}", err); + let err = BackendSpecificError { description }; + return Err(err); + } + // Notifying the corresponding task handler. + debug_assert!(result >= winbase::WAIT_OBJECT_0); + let handle_idx = (result - winbase::WAIT_OBJECT_0) as usize; + Ok(handle_idx) +} + +// Get the number of available frames that are available for writing/reading. +fn get_available_frames(stream: &StreamInner) -> Result { + unsafe { + let mut padding = mem::uninitialized(); + let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding); + stream_error_from_hresult(hresult)?; + Ok(stream.max_frames_in_buffer - padding) + } +} + +// Convert the given `HRESULT` into a `StreamError` if it does indicate an error. +fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> { + if hresult == AUDCLNT_E_DEVICE_INVALIDATED { + return Err(StreamError::DeviceNotAvailable); + } + if let Err(err) = check_result(hresult) { + let description = format!("{}", err); + let err = BackendSpecificError { description }; + return Err(err.into()); + } + Ok(()) +} From fddea2edd848607337443c369f983ca25b55ec81 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 03:53:51 +0200 Subject: [PATCH 05/10] Update the null and emscripten backends for StreamEvent API --- src/emscripten/mod.rs | 8 +++++--- src/null/mod.rs | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index b828f7f..cb53729 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -17,6 +17,7 @@ use PauseStreamError; use PlayStreamError; use SupportedFormatsError; use StreamData; +use StreamEvent; use SupportedFormat; use UnknownTypeOutputBuffer; @@ -43,7 +44,7 @@ impl EventLoop { #[inline] pub fn run(&self, callback: F) -> ! - where F: FnMut(StreamId, StreamData) + where F: FnMut(StreamId, StreamEvent) { // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // of this callback is to fill the content of the audio buffers. @@ -52,7 +53,7 @@ impl EventLoop { // and to the `callback` parameter that was passed to `run`. fn callback_fn(user_data_ptr: *mut c_void) - where F: FnMut(StreamId, StreamData) + where F: FnMut(StreamId, StreamEvent) { unsafe { let user_data_ptr2 = user_data_ptr as *mut (&EventLoop, F); @@ -71,7 +72,8 @@ impl EventLoop { { let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { buffer: &mut temporary_buffer }); let data = StreamData::Output { buffer: buffer }; - user_cb(StreamId(stream_id), data); + let event = StreamEvent::Data(data); + user_cb(StreamId(stream_id), event); // TODO: directly use a TypedArray once this is supported by stdweb } diff --git a/src/null/mod.rs b/src/null/mod.rs index c7e4cb7..f3238c8 100644 --- a/src/null/mod.rs +++ b/src/null/mod.rs @@ -10,7 +10,7 @@ use Format; use PauseStreamError; use PlayStreamError; use SupportedFormatsError; -use StreamData; +use StreamEvent; use SupportedFormat; pub struct EventLoop; @@ -23,7 +23,7 @@ impl EventLoop { #[inline] pub fn run(&self, _callback: F) -> ! - where F: FnMut(StreamId, StreamData) + where F: FnMut(StreamId, StreamEvent) { loop { /* TODO: don't spin */ } } From b2c1226b47080999d0eecc29d437aa3c87ca6120 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 15:23:46 +0200 Subject: [PATCH 06/10] Update coreaudio backend for `StreamEvent` API addition. --- src/coreaudio/mod.rs | 112 ++++++++++++++++++++++++++++++------------- 1 file changed, 79 insertions(+), 33 deletions(-) diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index 1f5b831..6ac386f 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -12,7 +12,9 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; +use StreamCloseCause; use StreamData; +use StreamEvent; use SupportedFormat; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; @@ -319,13 +321,24 @@ pub struct StreamId(usize); pub struct EventLoop { // This `Arc` is shared with all the callbacks of coreaudio. - active_callbacks: Arc, + // + // TODO: Eventually, CPAL's API should be changed to allow for submitting a unique callback per + // stream to avoid streams blocking one another. + user_callback: Arc>, streams: Mutex>>, } -struct ActiveCallbacks { - // Whenever the `run()` method is called with a callback, this callback is put in this list. - callbacks: Mutex>, +enum UserCallback { + // When `run` is called with a callback, that callback will be stored here. + // + // It is essential for the safety of the program that this callback is removed before `run` + // returns (not possible with the current CPAL API). + Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)), + // A queue of events that have occurred but that have not yet been emitted to the user as we + // don't yet have a callback to do so. + Inactive { + pending_events: Vec<(StreamId, StreamEvent<'static>)> + }, } struct StreamInner { @@ -427,22 +440,32 @@ impl EventLoop { #[inline] pub fn new() -> EventLoop { EventLoop { - active_callbacks: Arc::new(ActiveCallbacks { callbacks: Mutex::new(Vec::new()) }), + user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), streams: Mutex::new(Vec::new()), } } #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamData) + Send + where F: FnMut(StreamId, StreamEvent) + Send { { - let callback: &mut (FnMut(StreamId, StreamData) + Send) = &mut callback; - self.active_callbacks - .callbacks - .lock() - .unwrap() - .push(unsafe { mem::transmute(callback) }); + let mut guard = self.user_callback.lock().unwrap(); + let pending_events = match *guard { + UserCallback::Inactive { ref mut pending_events } => { + mem::replace(pending_events, vec![]) + } + UserCallback::Active(_) => { + panic!("`EventLoop::run` was called when the event loop was already running"); + } + }; + + let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; + for (stream_id, event) in pending_events { + callback(stream_id, event); + } + + *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); } loop { @@ -450,8 +473,8 @@ impl EventLoop { thread::sleep(Duration::new(1u64, 0u32)); } - // Note: if we ever change this API so that `run` can return, then it is critical that - // we remove the callback from `active_callbacks`. + // It is critical that we remove the callback before returning (currently not possible). + // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; } fn next_stream_id(&self) -> usize { @@ -650,7 +673,7 @@ impl EventLoop { // Register the callback that is being called by coreaudio whenever it needs data to be // fed to the audio buffer. - let active_callbacks = self.active_callbacks.clone(); + let user_callback = self.user_callback.clone(); let sample_format = format.data_type; let bytes_per_channel = format.data_type.sample_size(); type Args = render_callback::Args; @@ -666,20 +689,21 @@ impl EventLoop { mData: data } = buffers[0]; - let mut callbacks = active_callbacks.callbacks.lock().unwrap(); + let mut user_callback = user_callback.lock().unwrap(); // A small macro to simplify handling the callback for different sample types. macro_rules! try_callback { ($SampleFormat:ident, $SampleType:ty) => {{ let data_len = (data_byte_size as usize / bytes_per_channel) as usize; let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len); - let callback = match callbacks.get_mut(0) { - Some(cb) => cb, - None => return Ok(()), + let callback = match *user_callback { + UserCallback::Active(ref mut cb) => cb, + UserCallback::Inactive { .. } => return Ok(()), }; let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let stream_data = StreamData::Input { buffer: unknown_type_buffer }; - callback(StreamId(stream_id), stream_data); + let stream_event = StreamEvent::Data(stream_data); + callback(StreamId(stream_id), stream_event); }}; } @@ -723,7 +747,7 @@ impl EventLoop { // Register the callback that is being called by coreaudio whenever it needs data to be // fed to the audio buffer. - let active_callbacks = self.active_callbacks.clone(); + let user_callback = self.user_callback.clone(); let sample_format = format.data_type; let bytes_per_channel = format.data_type.sample_size(); type Args = render_callback::Args; @@ -737,16 +761,16 @@ impl EventLoop { mData: data } = (*args.data.data).mBuffers[0]; - let mut callbacks = active_callbacks.callbacks.lock().unwrap(); + let mut user_callback = user_callback.lock().unwrap(); // A small macro to simplify handling the callback for different sample types. macro_rules! try_callback { ($SampleFormat:ident, $SampleType:ty, $equilibrium:expr) => {{ let data_len = (data_byte_size as usize / bytes_per_channel) as usize; let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len); - let callback = match callbacks.get_mut(0) { - Some(cb) => cb, - None => { + let callback = match *user_callback { + UserCallback::Active(ref mut cb) => cb, + UserCallback::Inactive { .. } => { for sample in data_slice.iter_mut() { *sample = $equilibrium; } @@ -755,7 +779,8 @@ impl EventLoop { }; let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice }); let stream_data = StreamData::Output { buffer: unknown_type_buffer }; - callback(StreamId(stream_id), stream_data); + let stream_event = StreamEvent::Data(stream_data); + callback(StreamId(stream_id), stream_event); }}; } @@ -777,16 +802,33 @@ impl EventLoop { Ok(StreamId(stream_id)) } - pub fn destroy_stream(&self, stream_id: StreamId) { - let mut streams = self.streams.lock().unwrap(); - streams[stream_id.0] = None; + fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { + let mut guard = self.user_callback.lock().unwrap(); + match *guard { + UserCallback::Active(ref mut callback) => callback(id, event), + UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), + } } - pub fn play_stream(&self, stream: StreamId) -> Result<(), PlayStreamError> { + pub fn destroy_stream(&self, stream_id: StreamId) { + { + let mut streams = self.streams.lock().unwrap(); + streams[stream_id.0] = None; + } + // Emit the `Close` event to the user. + let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); + self.emit_or_enqueue_event(stream_id, event); + } + + pub fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> { let mut streams = self.streams.lock().unwrap(); - let stream = streams[stream.0].as_mut().unwrap(); + let stream = streams[stream_id.0].as_mut().unwrap(); if !stream.playing { + // Emit the `Play` event to the user. This should not block, as the stream should not + // yet be playing if this is being called. + self.emit_or_enqueue_event(stream_id, StreamEvent::Play); + if let Err(e) = stream.audio_unit.start() { let description = format!("{}", std::error::Error::description(&e)); let err = BackendSpecificError { description }; @@ -797,9 +839,9 @@ impl EventLoop { Ok(()) } - pub fn pause_stream(&self, stream: StreamId) -> Result<(), PauseStreamError> { + pub fn pause_stream(&self, stream_id: StreamId) -> Result<(), PauseStreamError> { let mut streams = self.streams.lock().unwrap(); - let stream = streams[stream.0].as_mut().unwrap(); + let stream = streams[stream_id.0].as_mut().unwrap(); if stream.playing { if let Err(e) = stream.audio_unit.stop() { @@ -807,6 +849,10 @@ impl EventLoop { let err = BackendSpecificError { description }; return Err(err.into()); } + + // Emit the `Pause` event to the user. + self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); + stream.playing = false; } Ok(()) From b1539c534f604930ff4a4aa1225a95e065d720bd Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sat, 22 Jun 2019 17:47:31 +0200 Subject: [PATCH 07/10] Add handling of `Play`, `Pause` and `Close` stream events to emscripten --- src/emscripten/mod.rs | 76 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 9 deletions(-) diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index cb53729..ad82af4 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -1,7 +1,7 @@ use std::mem; use std::os::raw::c_void; use std::slice::from_raw_parts; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use stdweb; use stdweb::Reference; use stdweb::unstable::TryInto; @@ -16,6 +16,7 @@ use Format; use PauseStreamError; use PlayStreamError; use SupportedFormatsError; +use StreamCloseCause; use StreamData; use StreamEvent; use SupportedFormat; @@ -32,20 +33,62 @@ use UnknownTypeOutputBuffer; pub struct EventLoop { streams: Mutex>>, + // The `EventLoop` requires a handle to the callbacks in order to be able to emit necessary + // events for `Play`, `Pause` and `Close`. + user_callback: Arc> +} + +enum UserCallback { + // When `run` is called with a callback, that callback will be stored here. + // + // It is essential for the safety of the program that this callback is removed before `run` + // returns (not possible with the current CPAL API). + Active(&'static mut (dyn FnMut(StreamId, StreamEvent) + Send)), + // A queue of events that have occurred but that have not yet been emitted to the user as we + // don't yet have a callback to do so. + Inactive { + pending_events: Vec<(StreamId, StreamEvent<'static>)> + }, } impl EventLoop { #[inline] pub fn new() -> EventLoop { stdweb::initialize(); - - EventLoop { streams: Mutex::new(Vec::new()) } + EventLoop { + streams: Mutex::new(Vec::new()), + user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), + } } #[inline] - pub fn run(&self, callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + pub fn run(&self, mut callback: F) -> ! + where F: FnMut(StreamId, StreamEvent) + Send, { + // Retrieve and process any pending events. + // + // Then, set the callback ready to be shared between audio processing and the event loop + // handle. + { + let mut guard = self.user_callback.lock().unwrap(); + let pending_events = match *guard { + UserCallback::Inactive { ref mut pending_events } => { + mem::replace(pending_events, vec![]) + } + UserCallback::Active(_) => { + panic!("`EventLoop::run` was called when the event loop was already running"); + } + }; + + let callback: &mut (dyn FnMut(StreamId, StreamEvent) + Send) = &mut callback; + for (stream_id, event) in pending_events { + callback(stream_id, event); + } + + *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); + } + + // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // of this callback is to fill the content of the audio buffers. @@ -79,10 +122,10 @@ impl EventLoop { let typed_array = { let f32_slice = temporary_buffer.as_slice(); - let u8_slice: &[u8] = unsafe { - from_raw_parts(f32_slice.as_ptr() as *const _, - f32_slice.len() * mem::size_of::()) - }; + let u8_slice: &[u8] = from_raw_parts( + f32_slice.as_ptr() as *const _, + f32_slice.len() * mem::size_of::(), + ); let typed_array: TypedArray = u8_slice.into(); typed_array }; @@ -121,6 +164,9 @@ impl EventLoop { set_timeout(|| callback_fn::(user_data_ptr as *mut _), 10); stdweb::event_loop(); + + // It is critical that we remove the callback before returning (currently not possible). + // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; } #[inline] @@ -145,9 +191,19 @@ impl EventLoop { Ok(StreamId(stream_id)) } + fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { + let mut guard = self.user_callback.lock().unwrap(); + match *guard { + UserCallback::Active(ref mut callback) => callback(id, event), + UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), + } + } + #[inline] pub fn destroy_stream(&self, stream_id: StreamId) { self.streams.lock().unwrap()[stream_id.0] = None; + let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); + self.emit_or_enqueue_event(stream_id, event); } #[inline] @@ -157,6 +213,7 @@ impl EventLoop { .get(stream_id.0) .and_then(|v| v.as_ref()) .expect("invalid stream ID"); + self.emit_or_enqueue_event(stream_id, StreamEvent::Play); js!(@{stream}.resume()); Ok(()) } @@ -169,6 +226,7 @@ impl EventLoop { .and_then(|v| v.as_ref()) .expect("invalid stream ID"); js!(@{stream}.suspend()); + self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); Ok(()) } } From 26f7e99e9bf9da3001dd2846b69f8e52188ca1a7 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Sun, 23 Jun 2019 19:04:24 +0200 Subject: [PATCH 08/10] Remove all `Pause`, `Play` and `Close` events A follow up to [this comment](https://github.com/tomaka/cpal/pull/288#issuecomment-504712574). --- src/alsa/mod.rs | 12 ++------- src/coreaudio/mod.rs | 47 ++++++--------------------------- src/emscripten/mod.rs | 61 ++----------------------------------------- src/lib.rs | 6 ----- src/wasapi/stream.rs | 7 ----- 5 files changed, 12 insertions(+), 121 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 6c4a276..b11b590 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -14,7 +14,6 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; -use StreamCloseCause; use StreamData; use StreamError; use StreamEvent; @@ -473,7 +472,7 @@ impl EventLoop { let run_context = &mut *run_context; 'stream_loop: loop { - process_commands(run_context, callback); + process_commands(run_context); reset_descriptors_with_pending_command_trigger( &mut run_context.descriptors, @@ -827,22 +826,16 @@ impl EventLoop { } // Process any pending `Command`s within the `RunContext`'s queue. -fn process_commands( - run_context: &mut RunContext, - callback: &mut dyn FnMut(StreamId, StreamEvent), -) { +fn process_commands(run_context: &mut RunContext) { for command in run_context.commands.try_iter() { match command { Command::DestroyStream(stream_id) => { run_context.streams.retain(|s| s.id != stream_id); - let event = StreamCloseCause::UserDestroyed.into(); - callback(stream_id, event); }, Command::PlayStream(stream_id) => { if let Some(stream) = run_context.streams.iter_mut() .find(|stream| stream.can_pause && stream.id == stream_id) { - callback(stream_id, StreamEvent::Play); unsafe { alsa::snd_pcm_pause(stream.channel, 0); } @@ -853,7 +846,6 @@ fn process_commands( if let Some(stream) = run_context.streams.iter_mut() .find(|stream| stream.can_pause && stream.id == stream_id) { - callback(stream_id, StreamEvent::Pause); unsafe { alsa::snd_pcm_pause(stream.channel, 1); } diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index 6ac386f..101dbcc 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -12,7 +12,6 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; -use StreamCloseCause; use StreamData; use StreamEvent; use SupportedFormat; @@ -336,9 +335,7 @@ enum UserCallback { Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)), // A queue of events that have occurred but that have not yet been emitted to the user as we // don't yet have a callback to do so. - Inactive { - pending_events: Vec<(StreamId, StreamEvent<'static>)> - }, + Inactive, } struct StreamInner { @@ -440,7 +437,7 @@ impl EventLoop { #[inline] pub fn new() -> EventLoop { EventLoop { - user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), + user_callback: Arc::new(Mutex::new(UserCallback::Inactive)), streams: Mutex::new(Vec::new()), } } @@ -451,20 +448,10 @@ impl EventLoop { { { let mut guard = self.user_callback.lock().unwrap(); - let pending_events = match *guard { - UserCallback::Inactive { ref mut pending_events } => { - mem::replace(pending_events, vec![]) - } - UserCallback::Active(_) => { - panic!("`EventLoop::run` was called when the event loop was already running"); - } - }; - - let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; - for (stream_id, event) in pending_events { - callback(stream_id, event); + if let UserCallback::Active(_) = *guard { + panic!("`EventLoop::run` was called when the event loop was already running"); } - + let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); } @@ -474,7 +461,7 @@ impl EventLoop { } // It is critical that we remove the callback before returning (currently not possible). - // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; + // *self.user_callback.lock().unwrap() = UserCallback::Inactive; } fn next_stream_id(&self) -> usize { @@ -698,7 +685,7 @@ impl EventLoop { let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len); let callback = match *user_callback { UserCallback::Active(ref mut cb) => cb, - UserCallback::Inactive { .. } => return Ok(()), + UserCallback::Inactive => return Ok(()), }; let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let stream_data = StreamData::Input { buffer: unknown_type_buffer }; @@ -770,7 +757,7 @@ impl EventLoop { let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len); let callback = match *user_callback { UserCallback::Active(ref mut cb) => cb, - UserCallback::Inactive { .. } => { + UserCallback::Inactive => { for sample in data_slice.iter_mut() { *sample = $equilibrium; } @@ -802,22 +789,11 @@ impl EventLoop { Ok(StreamId(stream_id)) } - fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { - let mut guard = self.user_callback.lock().unwrap(); - match *guard { - UserCallback::Active(ref mut callback) => callback(id, event), - UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), - } - } - pub fn destroy_stream(&self, stream_id: StreamId) { { let mut streams = self.streams.lock().unwrap(); streams[stream_id.0] = None; } - // Emit the `Close` event to the user. - let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); - self.emit_or_enqueue_event(stream_id, event); } pub fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> { @@ -825,10 +801,6 @@ impl EventLoop { let stream = streams[stream_id.0].as_mut().unwrap(); if !stream.playing { - // Emit the `Play` event to the user. This should not block, as the stream should not - // yet be playing if this is being called. - self.emit_or_enqueue_event(stream_id, StreamEvent::Play); - if let Err(e) = stream.audio_unit.start() { let description = format!("{}", std::error::Error::description(&e)); let err = BackendSpecificError { description }; @@ -850,9 +822,6 @@ impl EventLoop { return Err(err.into()); } - // Emit the `Pause` event to the user. - self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); - stream.playing = false; } Ok(()) diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index ad82af4..687f648 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -1,7 +1,7 @@ use std::mem; use std::os::raw::c_void; use std::slice::from_raw_parts; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use stdweb; use stdweb::Reference; use stdweb::unstable::TryInto; @@ -16,7 +16,6 @@ use Format; use PauseStreamError; use PlayStreamError; use SupportedFormatsError; -use StreamCloseCause; use StreamData; use StreamEvent; use SupportedFormat; @@ -33,22 +32,6 @@ use UnknownTypeOutputBuffer; pub struct EventLoop { streams: Mutex>>, - // The `EventLoop` requires a handle to the callbacks in order to be able to emit necessary - // events for `Play`, `Pause` and `Close`. - user_callback: Arc> -} - -enum UserCallback { - // When `run` is called with a callback, that callback will be stored here. - // - // It is essential for the safety of the program that this callback is removed before `run` - // returns (not possible with the current CPAL API). - Active(&'static mut (dyn FnMut(StreamId, StreamEvent) + Send)), - // A queue of events that have occurred but that have not yet been emitted to the user as we - // don't yet have a callback to do so. - Inactive { - pending_events: Vec<(StreamId, StreamEvent<'static>)> - }, } impl EventLoop { @@ -57,38 +40,13 @@ impl EventLoop { stdweb::initialize(); EventLoop { streams: Mutex::new(Vec::new()), - user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })), } } #[inline] - pub fn run(&self, mut callback: F) -> ! + pub fn run(&self, callback: F) -> ! where F: FnMut(StreamId, StreamEvent) + Send, { - // Retrieve and process any pending events. - // - // Then, set the callback ready to be shared between audio processing and the event loop - // handle. - { - let mut guard = self.user_callback.lock().unwrap(); - let pending_events = match *guard { - UserCallback::Inactive { ref mut pending_events } => { - mem::replace(pending_events, vec![]) - } - UserCallback::Active(_) => { - panic!("`EventLoop::run` was called when the event loop was already running"); - } - }; - - let callback: &mut (dyn FnMut(StreamId, StreamEvent) + Send) = &mut callback; - for (stream_id, event) in pending_events { - callback(stream_id, event); - } - - *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); - } - - // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // of this callback is to fill the content of the audio buffers. @@ -164,9 +122,6 @@ impl EventLoop { set_timeout(|| callback_fn::(user_data_ptr as *mut _), 10); stdweb::event_loop(); - - // It is critical that we remove the callback before returning (currently not possible). - // *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] }; } #[inline] @@ -191,19 +146,9 @@ impl EventLoop { Ok(StreamId(stream_id)) } - fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) { - let mut guard = self.user_callback.lock().unwrap(); - match *guard { - UserCallback::Active(ref mut callback) => callback(id, event), - UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)), - } - } - #[inline] pub fn destroy_stream(&self, stream_id: StreamId) { self.streams.lock().unwrap()[stream_id.0] = None; - let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); - self.emit_or_enqueue_event(stream_id, event); } #[inline] @@ -213,7 +158,6 @@ impl EventLoop { .get(stream_id.0) .and_then(|v| v.as_ref()) .expect("invalid stream ID"); - self.emit_or_enqueue_event(stream_id, StreamEvent::Play); js!(@{stream}.resume()); Ok(()) } @@ -226,7 +170,6 @@ impl EventLoop { .and_then(|v| v.as_ref()) .expect("invalid stream ID"); js!(@{stream}.suspend()); - self.emit_or_enqueue_event(stream_id, StreamEvent::Pause); Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 48b90a2..78a21d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -219,12 +219,6 @@ pub enum StreamData<'a> { pub enum StreamEvent<'a> { /// Some data is ready to be processed. Data(StreamData<'a>), - /// The stream has received a **Play** command. - Play, - /// The stream has received a **Pause** command. - /// - /// No **Data** events should occur until a subsequent **Play** command is received. - Pause, /// The stream was closed, either because the user destroyed it or because of an error. /// /// The stream event callback will not be called again after this event occurs. diff --git a/src/wasapi/stream.rs b/src/wasapi/stream.rs index 9539ac9..2ba2ac6 100644 --- a/src/wasapi/stream.rs +++ b/src/wasapi/stream.rs @@ -26,7 +26,6 @@ use Format; use PauseStreamError; use PlayStreamError; use SampleFormat; -use StreamCloseCause; use StreamData; use StreamError; use StreamEvent; @@ -758,8 +757,6 @@ fn process_commands( run_context.streams.remove(p); }, } - let event = StreamEvent::Close(StreamCloseCause::UserDestroyed); - callback(stream_id, event); }, Command::PlayStream(stream_id) => { match run_context.streams.iter().position(|s| s.id == stream_id) { @@ -772,8 +769,6 @@ fn process_commands( match stream_error_from_hresult(hresult) { Ok(()) => { run_context.streams[p].playing = true; - let event = StreamEvent::Play; - callback(stream_id, event); } Err(err) => { let event = StreamEvent::Close(err.into()); @@ -797,8 +792,6 @@ fn process_commands( match stream_error_from_hresult(hresult) { Ok(()) => { run_context.streams[p].playing = false; - let event = StreamEvent::Pause; - callback(stream_id, event); } Err(err) => { let event = StreamEvent::Close(err.into()); From c05d2916b1895ee5ecd6b6c46f6b1bed376bb4a5 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Mon, 24 Jun 2019 20:43:27 +0200 Subject: [PATCH 09/10] Remove `StreamEvent` in favour of `StreamDataResult` --- examples/beep.rs | 9 ++++---- examples/feedback.rs | 9 ++++---- examples/record_wav.rs | 5 ++--- src/alsa/mod.rs | 19 +++++++--------- src/coreaudio/mod.rs | 14 +++++------- src/emscripten/mod.rs | 9 ++++---- src/lib.rs | 50 ++++++------------------------------------ src/null/mod.rs | 4 ++-- src/wasapi/stream.rs | 26 +++++++++------------- 9 files changed, 47 insertions(+), 98 deletions(-) diff --git a/examples/beep.rs b/examples/beep.rs index c6b70e0..dc0edd2 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -17,14 +17,13 @@ fn main() -> Result<(), failure::Error> { (sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin() }; - event_loop.run(move |id, event| { - let data = match event { - cpal::StreamEvent::Data(data) => data, - cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { + event_loop.run(move |id, result| { + let data = match result { + Ok(data) => data, + Err(err) => { eprintln!("stream {:?} closed due to an error: {}", id, err); return; } - _ => return, }; match data { diff --git a/examples/feedback.rs b/examples/feedback.rs index 4c067dd..29aec5e 100644 --- a/examples/feedback.rs +++ b/examples/feedback.rs @@ -49,14 +49,13 @@ fn main() -> Result<(), failure::Error> { // Run the event loop on a separate thread. std::thread::spawn(move || { - event_loop.run(move |id, event| { - let data = match event { - cpal::StreamEvent::Data(data) => data, - cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { + event_loop.run(move |id, result| { + let data = match result { + Ok(data) => data, + Err(err) => { eprintln!("stream {:?} closed due to an error: {}", id, err); return; } - _ => return, }; match data { diff --git a/examples/record_wav.rs b/examples/record_wav.rs index 74c10aa..0b58e01 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -32,12 +32,11 @@ fn main() -> Result<(), failure::Error> { std::thread::spawn(move || { event_loop.run(move |id, event| { let data = match event { - cpal::StreamEvent::Data(data) => data, - cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { + Ok(data) => data, + Err(err) => { eprintln!("stream {:?} closed due to an error: {}", id, err); return; } - _ => return, }; // If we're done recording, return early. diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index b11b590..4d276cc 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -15,8 +15,8 @@ use SupportedFormatsError; use SampleFormat; use SampleRate; use StreamData; +use StreamDataResult; use StreamError; -use StreamEvent; use SupportedFormat; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; @@ -461,12 +461,12 @@ impl EventLoop { #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + where F: FnMut(StreamId, StreamDataResult) { self.run_inner(&mut callback) } - fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { + fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! { unsafe { let mut run_context = self.run_context.lock().unwrap(); let run_context = &mut *run_context; @@ -487,8 +487,8 @@ impl EventLoop { Ok(false) => continue, Err(err) => { for stream in run_context.streams.iter() { - let event = StreamEvent::Close(err.clone().into()); - callback(stream.id, event); + let result = Err(err.clone().into()); + callback(stream.id, result); } run_context.streams.clear(); break 'stream_loop; @@ -578,8 +578,7 @@ impl EventLoop { let stream_data = StreamData::Input { buffer: input_buffer, }; - let event = StreamEvent::Data(stream_data); - callback(stream.id, event); + callback(stream.id, Ok(stream_data)); }, StreamType::Output => { { @@ -599,8 +598,7 @@ impl EventLoop { let stream_data = StreamData::Output { buffer: output_buffer, }; - let event = StreamEvent::Data(stream_data); - callback(stream.id, event); + callback(stream.id, Ok(stream_data)); } loop { let result = alsa::snd_pcm_writei( @@ -639,8 +637,7 @@ impl EventLoop { // Remove any streams that have errored and notify the user. for (stream_id, err) in streams_to_remove { run_context.streams.retain(|s| s.id != stream_id); - let event = StreamEvent::Close(err.into()); - callback(stream_id, event); + callback(stream_id, Err(err.into())); } } } diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index 101dbcc..3cd3613 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -13,7 +13,7 @@ use SupportedFormatsError; use SampleFormat; use SampleRate; use StreamData; -use StreamEvent; +use StreamDataResult; use SupportedFormat; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; @@ -332,7 +332,7 @@ enum UserCallback { // // It is essential for the safety of the program that this callback is removed before `run` // returns (not possible with the current CPAL API). - Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)), + Active(&'static mut (FnMut(StreamId, StreamDataResult) + Send)), // A queue of events that have occurred but that have not yet been emitted to the user as we // don't yet have a callback to do so. Inactive, @@ -444,14 +444,14 @@ impl EventLoop { #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + Send + where F: FnMut(StreamId, StreamDataResult) + Send { { let mut guard = self.user_callback.lock().unwrap(); if let UserCallback::Active(_) = *guard { panic!("`EventLoop::run` was called when the event loop was already running"); } - let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; + let callback: &mut (FnMut(StreamId, StreamDataResult) + Send) = &mut callback; *guard = UserCallback::Active(unsafe { mem::transmute(callback) }); } @@ -689,8 +689,7 @@ impl EventLoop { }; let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let stream_data = StreamData::Input { buffer: unknown_type_buffer }; - let stream_event = StreamEvent::Data(stream_data); - callback(StreamId(stream_id), stream_event); + callback(StreamId(stream_id), Ok(stream_data)); }}; } @@ -766,8 +765,7 @@ impl EventLoop { }; let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice }); let stream_data = StreamData::Output { buffer: unknown_type_buffer }; - let stream_event = StreamEvent::Data(stream_data); - callback(StreamId(stream_id), stream_event); + callback(StreamId(stream_id), Ok(stream_data)); }}; } diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index 687f648..2ca7a93 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -17,7 +17,7 @@ use PauseStreamError; use PlayStreamError; use SupportedFormatsError; use StreamData; -use StreamEvent; +use StreamDataResult; use SupportedFormat; use UnknownTypeOutputBuffer; @@ -45,7 +45,7 @@ impl EventLoop { #[inline] pub fn run(&self, callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + Send, + where F: FnMut(StreamId, StreamDataResult) + Send, { // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // of this callback is to fill the content of the audio buffers. @@ -54,7 +54,7 @@ impl EventLoop { // and to the `callback` parameter that was passed to `run`. fn callback_fn(user_data_ptr: *mut c_void) - where F: FnMut(StreamId, StreamEvent) + where F: FnMut(StreamId, StreamDataResult) { unsafe { let user_data_ptr2 = user_data_ptr as *mut (&EventLoop, F); @@ -73,8 +73,7 @@ impl EventLoop { { let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { buffer: &mut temporary_buffer }); let data = StreamData::Output { buffer: buffer }; - let event = StreamEvent::Data(data); - user_cb(StreamId(stream_id), event); + user_cb(StreamId(stream_id), Ok(data)); // TODO: directly use a TypedArray once this is supported by stdweb } diff --git a/src/lib.rs b/src/lib.rs index 78a21d1..d496b32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,9 +92,9 @@ //! # let event_loop = cpal::EventLoop::new(); //! event_loop.run(move |stream_id, stream_event| { //! let stream_data = match stream_event { -//! cpal::StreamEvent::Data(data) => data, -//! cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { -//! eprintln!("stream {:?} closed due to an error: {}", stream_id, err); +//! Ok(data) => data, +//! Err(err) => { +//! eprintln!("an error occurred on stream {:?}: {}", stream_id, err); //! return; //! } //! _ => return, @@ -215,24 +215,9 @@ pub enum StreamData<'a> { }, } -/// Events that may be emitted to the user via the callback submitted to `EventLoop::run`. -pub enum StreamEvent<'a> { - /// Some data is ready to be processed. - Data(StreamData<'a>), - /// The stream was closed, either because the user destroyed it or because of an error. - /// - /// The stream event callback will not be called again after this event occurs. - Close(StreamCloseCause), -} - -/// The cause behind why a stream was closed. -#[derive(Debug)] -pub enum StreamCloseCause { - /// The stream was closed as the user called `destroy_stream`. - UserDestroyed, - /// The stream was closed due to an error occurring. - Error(StreamError), -} +/// Stream data passed to the `EventLoop::run` callback, or an error in the case that the device +/// was invalidated or some backend-specific error occurred. +pub type StreamDataResult<'a> = Result, StreamError>; /// Represents a buffer containing audio data that may be read. /// @@ -441,9 +426,6 @@ pub enum PauseStreamError { } /// Errors that might occur while a stream is running. -/// -/// These errors are delivered to the user callback via -/// `StreamEvent::Close(StreamCloseCause::Error(_))` #[derive(Debug, Fail)] pub enum StreamError { /// The device no longer exists. This can happen if the device is disconnected while the @@ -637,7 +619,7 @@ impl EventLoop { /// You can call the other methods of `EventLoop` without getting a deadlock. #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + Send + where F: FnMut(StreamId, StreamDataResult) + Send { self.0.run(move |id, data| callback(StreamId(id), data)) } @@ -835,18 +817,6 @@ impl Iterator for SupportedOutputFormats { } } -impl From for StreamCloseCause { - fn from(err: StreamError) -> Self { - StreamCloseCause::Error(err) - } -} - -impl<'a> From for StreamEvent<'a> { - fn from(cause: StreamCloseCause) -> Self { - StreamEvent::Close(cause) - } -} - impl From for DevicesError { fn from(err: BackendSpecificError) -> Self { DevicesError::BackendSpecific { err } @@ -895,12 +865,6 @@ impl From for StreamError { } } -impl From for StreamCloseCause { - fn from(err: BackendSpecificError) -> Self { - StreamCloseCause::Error(err.into()) - } -} - // If a backend does not provide an API for retrieving supported formats, we query it with a bunch // of commonly used rates. This is always the case for wasapi and is sometimes the case for alsa. // diff --git a/src/null/mod.rs b/src/null/mod.rs index f3238c8..ff5213a 100644 --- a/src/null/mod.rs +++ b/src/null/mod.rs @@ -9,8 +9,8 @@ use DeviceNameError; use Format; use PauseStreamError; use PlayStreamError; +use StreamDataResult; use SupportedFormatsError; -use StreamEvent; use SupportedFormat; pub struct EventLoop; @@ -23,7 +23,7 @@ impl EventLoop { #[inline] pub fn run(&self, _callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + where F: FnMut(StreamId, StreamDataResult) { loop { /* TODO: don't spin */ } } diff --git a/src/wasapi/stream.rs b/src/wasapi/stream.rs index 2ba2ac6..cad7b81 100644 --- a/src/wasapi/stream.rs +++ b/src/wasapi/stream.rs @@ -27,8 +27,8 @@ use PauseStreamError; use PlayStreamError; use SampleFormat; use StreamData; +use StreamDataResult; use StreamError; -use StreamEvent; use UnknownTypeOutputBuffer; use UnknownTypeInputBuffer; @@ -445,12 +445,12 @@ impl EventLoop { #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamEvent) + where F: FnMut(StreamId, StreamDataResult) { self.run_inner(&mut callback); } - fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { + fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! { unsafe { // We keep `run_context` locked forever, which guarantees that two invocations of // `run()` cannot run simultaneously. @@ -472,8 +472,7 @@ impl EventLoop { Some(p) => { run_context.handles.remove(p + 1); run_context.streams.remove(p); - let event = StreamEvent::Close(err.into()); - callback(stream_id, event); + callback(stream_id, Err(err.into())); }, } } @@ -486,8 +485,7 @@ impl EventLoop { Ok(idx) => idx, Err(err) => { for stream in &run_context.streams { - let event = StreamEvent::Close(err.clone().into()); - callback(stream.id.clone(), event); + callback(stream.id.clone(), Err(err.clone().into())); } run_context.streams.clear(); run_context.handles.truncate(1); @@ -553,8 +551,7 @@ impl EventLoop { buffer: slice, }); let data = StreamData::Input { buffer: unknown_buffer }; - let event = StreamEvent::Data(data); - callback(stream.id.clone(), event); + callback(stream.id.clone(), Ok(data)); // Release the buffer. let hresult = (*capture_client).ReleaseBuffer(frames_available); if let Err(err) = stream_error_from_hresult(hresult) { @@ -596,8 +593,7 @@ impl EventLoop { buffer: slice }); let data = StreamData::Output { buffer: unknown_buffer }; - let event = StreamEvent::Data(data); - callback(stream.id.clone(), event); + callback(stream.id.clone(), Ok(data)); let hresult = (*render_client) .ReleaseBuffer(frames_available as u32, 0); if let Err(err) = stream_error_from_hresult(hresult) { @@ -739,7 +735,7 @@ fn format_to_waveformatextensible(format: &Format) -> Option { - let event = StreamEvent::Close(err.into()); - callback(stream_id, event); + callback(stream_id, Err(err.into())); run_context.handles.remove(p + 1); run_context.streams.remove(p); } @@ -794,8 +789,7 @@ fn process_commands( run_context.streams[p].playing = false; } Err(err) => { - let event = StreamEvent::Close(err.into()); - callback(stream_id, event); + callback(stream_id, Err(err.into())); run_context.handles.remove(p + 1); run_context.streams.remove(p); } From c72bafd8abe4bba18fcaad93f7c4741fcb43d771 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Mon, 24 Jun 2019 20:57:01 +0200 Subject: [PATCH 10/10] Update examples eprintln to more accurately reflect error --- examples/beep.rs | 2 +- examples/feedback.rs | 2 +- examples/record_wav.rs | 2 +- src/emscripten/mod.rs | 2 +- src/lib.rs | 6 +++--- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/beep.rs b/examples/beep.rs index dc0edd2..96b9eaf 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -21,7 +21,7 @@ fn main() -> Result<(), failure::Error> { let data = match result { Ok(data) => data, Err(err) => { - eprintln!("stream {:?} closed due to an error: {}", id, err); + eprintln!("an error occurred on stream {:?}: {}", id, err); return; } }; diff --git a/examples/feedback.rs b/examples/feedback.rs index 29aec5e..95d634f 100644 --- a/examples/feedback.rs +++ b/examples/feedback.rs @@ -53,7 +53,7 @@ fn main() -> Result<(), failure::Error> { let data = match result { Ok(data) => data, Err(err) => { - eprintln!("stream {:?} closed due to an error: {}", id, err); + eprintln!("an error occurred on stream {:?}: {}", id, err); return; } }; diff --git a/examples/record_wav.rs b/examples/record_wav.rs index 0b58e01..66002df 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -34,7 +34,7 @@ fn main() -> Result<(), failure::Error> { let data = match event { Ok(data) => data, Err(err) => { - eprintln!("stream {:?} closed due to an error: {}", id, err); + eprintln!("an error occurred on stream {:?}: {}", id, err); return; } }; diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index 2ca7a93..78ffa96 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -45,7 +45,7 @@ impl EventLoop { #[inline] pub fn run(&self, callback: F) -> ! - where F: FnMut(StreamId, StreamDataResult) + Send, + where F: FnMut(StreamId, StreamDataResult), { // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // of this callback is to fill the content of the audio buffers. diff --git a/src/lib.rs b/src/lib.rs index d496b32..0c04b11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,7 @@ //! //! ```no_run //! # let event_loop = cpal::EventLoop::new(); -//! event_loop.run(move |_stream_id, _stream_event| { +//! event_loop.run(move |_stream_id, _stream_result| { //! // react to stream events and read or write stream data here //! }); //! ``` @@ -90,8 +90,8 @@ //! use cpal::{StreamData, UnknownTypeOutputBuffer}; //! //! # let event_loop = cpal::EventLoop::new(); -//! event_loop.run(move |stream_id, stream_event| { -//! let stream_data = match stream_event { +//! event_loop.run(move |stream_id, stream_result| { +//! let stream_data = match stream_result { //! Ok(data) => data, //! Err(err) => { //! eprintln!("an error occurred on stream {:?}: {}", stream_id, err);