Remove `StreamEvent` in favour of `StreamDataResult`

This commit is contained in:
mitchmindtree 2019-06-24 20:43:27 +02:00
parent 26f7e99e9b
commit c05d2916b1
9 changed files with 47 additions and 98 deletions

View File

@ -17,14 +17,13 @@ fn main() -> Result<(), failure::Error> {
(sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin() (sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin()
}; };
event_loop.run(move |id, event| { event_loop.run(move |id, result| {
let data = match event { let data = match result {
cpal::StreamEvent::Data(data) => data, Ok(data) => data,
cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { Err(err) => {
eprintln!("stream {:?} closed due to an error: {}", id, err); eprintln!("stream {:?} closed due to an error: {}", id, err);
return; return;
} }
_ => return,
}; };
match data { match data {

View File

@ -49,14 +49,13 @@ fn main() -> Result<(), failure::Error> {
// Run the event loop on a separate thread. // Run the event loop on a separate thread.
std::thread::spawn(move || { std::thread::spawn(move || {
event_loop.run(move |id, event| { event_loop.run(move |id, result| {
let data = match event { let data = match result {
cpal::StreamEvent::Data(data) => data, Ok(data) => data,
cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { Err(err) => {
eprintln!("stream {:?} closed due to an error: {}", id, err); eprintln!("stream {:?} closed due to an error: {}", id, err);
return; return;
} }
_ => return,
}; };
match data { match data {

View File

@ -32,12 +32,11 @@ fn main() -> Result<(), failure::Error> {
std::thread::spawn(move || { std::thread::spawn(move || {
event_loop.run(move |id, event| { event_loop.run(move |id, event| {
let data = match event { let data = match event {
cpal::StreamEvent::Data(data) => data, Ok(data) => data,
cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { Err(err) => {
eprintln!("stream {:?} closed due to an error: {}", id, err); eprintln!("stream {:?} closed due to an error: {}", id, err);
return; return;
} }
_ => return,
}; };
// If we're done recording, return early. // If we're done recording, return early.

View File

@ -15,8 +15,8 @@ use SupportedFormatsError;
use SampleFormat; use SampleFormat;
use SampleRate; use SampleRate;
use StreamData; use StreamData;
use StreamDataResult;
use StreamError; use StreamError;
use StreamEvent;
use SupportedFormat; use SupportedFormat;
use UnknownTypeInputBuffer; use UnknownTypeInputBuffer;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
@ -461,12 +461,12 @@ impl EventLoop {
#[inline] #[inline]
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamEvent) where F: FnMut(StreamId, StreamDataResult)
{ {
self.run_inner(&mut callback) self.run_inner(&mut callback)
} }
fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! {
unsafe { unsafe {
let mut run_context = self.run_context.lock().unwrap(); let mut run_context = self.run_context.lock().unwrap();
let run_context = &mut *run_context; let run_context = &mut *run_context;
@ -487,8 +487,8 @@ impl EventLoop {
Ok(false) => continue, Ok(false) => continue,
Err(err) => { Err(err) => {
for stream in run_context.streams.iter() { for stream in run_context.streams.iter() {
let event = StreamEvent::Close(err.clone().into()); let result = Err(err.clone().into());
callback(stream.id, event); callback(stream.id, result);
} }
run_context.streams.clear(); run_context.streams.clear();
break 'stream_loop; break 'stream_loop;
@ -578,8 +578,7 @@ impl EventLoop {
let stream_data = StreamData::Input { let stream_data = StreamData::Input {
buffer: input_buffer, buffer: input_buffer,
}; };
let event = StreamEvent::Data(stream_data); callback(stream.id, Ok(stream_data));
callback(stream.id, event);
}, },
StreamType::Output => { StreamType::Output => {
{ {
@ -599,8 +598,7 @@ impl EventLoop {
let stream_data = StreamData::Output { let stream_data = StreamData::Output {
buffer: output_buffer, buffer: output_buffer,
}; };
let event = StreamEvent::Data(stream_data); callback(stream.id, Ok(stream_data));
callback(stream.id, event);
} }
loop { loop {
let result = alsa::snd_pcm_writei( let result = alsa::snd_pcm_writei(
@ -639,8 +637,7 @@ impl EventLoop {
// Remove any streams that have errored and notify the user. // Remove any streams that have errored and notify the user.
for (stream_id, err) in streams_to_remove { for (stream_id, err) in streams_to_remove {
run_context.streams.retain(|s| s.id != stream_id); run_context.streams.retain(|s| s.id != stream_id);
let event = StreamEvent::Close(err.into()); callback(stream_id, Err(err.into()));
callback(stream_id, event);
} }
} }
} }

View File

@ -13,7 +13,7 @@ use SupportedFormatsError;
use SampleFormat; use SampleFormat;
use SampleRate; use SampleRate;
use StreamData; use StreamData;
use StreamEvent; use StreamDataResult;
use SupportedFormat; use SupportedFormat;
use UnknownTypeInputBuffer; use UnknownTypeInputBuffer;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
@ -332,7 +332,7 @@ enum UserCallback {
// //
// It is essential for the safety of the program that this callback is removed before `run` // It is essential for the safety of the program that this callback is removed before `run`
// returns (not possible with the current CPAL API). // returns (not possible with the current CPAL API).
Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)), Active(&'static mut (FnMut(StreamId, StreamDataResult) + Send)),
// A queue of events that have occurred but that have not yet been emitted to the user as we // A queue of events that have occurred but that have not yet been emitted to the user as we
// don't yet have a callback to do so. // don't yet have a callback to do so.
Inactive, Inactive,
@ -444,14 +444,14 @@ impl EventLoop {
#[inline] #[inline]
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamEvent) + Send where F: FnMut(StreamId, StreamDataResult) + Send
{ {
{ {
let mut guard = self.user_callback.lock().unwrap(); let mut guard = self.user_callback.lock().unwrap();
if let UserCallback::Active(_) = *guard { if let UserCallback::Active(_) = *guard {
panic!("`EventLoop::run` was called when the event loop was already running"); panic!("`EventLoop::run` was called when the event loop was already running");
} }
let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback; let callback: &mut (FnMut(StreamId, StreamDataResult) + Send) = &mut callback;
*guard = UserCallback::Active(unsafe { mem::transmute(callback) }); *guard = UserCallback::Active(unsafe { mem::transmute(callback) });
} }
@ -689,8 +689,7 @@ impl EventLoop {
}; };
let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice });
let stream_data = StreamData::Input { buffer: unknown_type_buffer }; let stream_data = StreamData::Input { buffer: unknown_type_buffer };
let stream_event = StreamEvent::Data(stream_data); callback(StreamId(stream_id), Ok(stream_data));
callback(StreamId(stream_id), stream_event);
}}; }};
} }
@ -766,8 +765,7 @@ impl EventLoop {
}; };
let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice }); let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice });
let stream_data = StreamData::Output { buffer: unknown_type_buffer }; let stream_data = StreamData::Output { buffer: unknown_type_buffer };
let stream_event = StreamEvent::Data(stream_data); callback(StreamId(stream_id), Ok(stream_data));
callback(StreamId(stream_id), stream_event);
}}; }};
} }

View File

@ -17,7 +17,7 @@ use PauseStreamError;
use PlayStreamError; use PlayStreamError;
use SupportedFormatsError; use SupportedFormatsError;
use StreamData; use StreamData;
use StreamEvent; use StreamDataResult;
use SupportedFormat; use SupportedFormat;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
@ -45,7 +45,7 @@ impl EventLoop {
#[inline] #[inline]
pub fn run<F>(&self, callback: F) -> ! pub fn run<F>(&self, callback: F) -> !
where F: FnMut(StreamId, StreamEvent) + Send, where F: FnMut(StreamId, StreamDataResult) + Send,
{ {
// The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job // The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job
// of this callback is to fill the content of the audio buffers. // of this callback is to fill the content of the audio buffers.
@ -54,7 +54,7 @@ impl EventLoop {
// and to the `callback` parameter that was passed to `run`. // and to the `callback` parameter that was passed to `run`.
fn callback_fn<F>(user_data_ptr: *mut c_void) fn callback_fn<F>(user_data_ptr: *mut c_void)
where F: FnMut(StreamId, StreamEvent) where F: FnMut(StreamId, StreamDataResult)
{ {
unsafe { unsafe {
let user_data_ptr2 = user_data_ptr as *mut (&EventLoop, F); let user_data_ptr2 = user_data_ptr as *mut (&EventLoop, F);
@ -73,8 +73,7 @@ impl EventLoop {
{ {
let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { buffer: &mut temporary_buffer }); let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { buffer: &mut temporary_buffer });
let data = StreamData::Output { buffer: buffer }; let data = StreamData::Output { buffer: buffer };
let event = StreamEvent::Data(data); user_cb(StreamId(stream_id), Ok(data));
user_cb(StreamId(stream_id), event);
// TODO: directly use a TypedArray<f32> once this is supported by stdweb // TODO: directly use a TypedArray<f32> once this is supported by stdweb
} }

View File

@ -92,9 +92,9 @@
//! # let event_loop = cpal::EventLoop::new(); //! # let event_loop = cpal::EventLoop::new();
//! event_loop.run(move |stream_id, stream_event| { //! event_loop.run(move |stream_id, stream_event| {
//! let stream_data = match stream_event { //! let stream_data = match stream_event {
//! cpal::StreamEvent::Data(data) => data, //! Ok(data) => data,
//! cpal::StreamEvent::Close(cpal::StreamCloseCause::Error(err)) => { //! Err(err) => {
//! eprintln!("stream {:?} closed due to an error: {}", stream_id, err); //! eprintln!("an error occurred on stream {:?}: {}", stream_id, err);
//! return; //! return;
//! } //! }
//! _ => return, //! _ => return,
@ -215,24 +215,9 @@ pub enum StreamData<'a> {
}, },
} }
/// Events that may be emitted to the user via the callback submitted to `EventLoop::run`. /// Stream data passed to the `EventLoop::run` callback, or an error in the case that the device
pub enum StreamEvent<'a> { /// was invalidated or some backend-specific error occurred.
/// Some data is ready to be processed. pub type StreamDataResult<'a> = Result<StreamData<'a>, StreamError>;
Data(StreamData<'a>),
/// The stream was closed, either because the user destroyed it or because of an error.
///
/// The stream event callback will not be called again after this event occurs.
Close(StreamCloseCause),
}
/// The cause behind why a stream was closed.
#[derive(Debug)]
pub enum StreamCloseCause {
/// The stream was closed as the user called `destroy_stream`.
UserDestroyed,
/// The stream was closed due to an error occurring.
Error(StreamError),
}
/// Represents a buffer containing audio data that may be read. /// Represents a buffer containing audio data that may be read.
/// ///
@ -441,9 +426,6 @@ pub enum PauseStreamError {
} }
/// Errors that might occur while a stream is running. /// Errors that might occur while a stream is running.
///
/// These errors are delivered to the user callback via
/// `StreamEvent::Close(StreamCloseCause::Error(_))`
#[derive(Debug, Fail)] #[derive(Debug, Fail)]
pub enum StreamError { pub enum StreamError {
/// The device no longer exists. This can happen if the device is disconnected while the /// The device no longer exists. This can happen if the device is disconnected while the
@ -637,7 +619,7 @@ impl EventLoop {
/// You can call the other methods of `EventLoop` without getting a deadlock. /// You can call the other methods of `EventLoop` without getting a deadlock.
#[inline] #[inline]
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamEvent) + Send where F: FnMut(StreamId, StreamDataResult) + Send
{ {
self.0.run(move |id, data| callback(StreamId(id), data)) self.0.run(move |id, data| callback(StreamId(id), data))
} }
@ -835,18 +817,6 @@ impl Iterator for SupportedOutputFormats {
} }
} }
impl From<StreamError> for StreamCloseCause {
fn from(err: StreamError) -> Self {
StreamCloseCause::Error(err)
}
}
impl<'a> From<StreamCloseCause> for StreamEvent<'a> {
fn from(cause: StreamCloseCause) -> Self {
StreamEvent::Close(cause)
}
}
impl From<BackendSpecificError> for DevicesError { impl From<BackendSpecificError> for DevicesError {
fn from(err: BackendSpecificError) -> Self { fn from(err: BackendSpecificError) -> Self {
DevicesError::BackendSpecific { err } DevicesError::BackendSpecific { err }
@ -895,12 +865,6 @@ impl From<BackendSpecificError> for StreamError {
} }
} }
impl From<BackendSpecificError> for StreamCloseCause {
fn from(err: BackendSpecificError) -> Self {
StreamCloseCause::Error(err.into())
}
}
// If a backend does not provide an API for retrieving supported formats, we query it with a bunch // If a backend does not provide an API for retrieving supported formats, we query it with a bunch
// of commonly used rates. This is always the case for wasapi and is sometimes the case for alsa. // of commonly used rates. This is always the case for wasapi and is sometimes the case for alsa.
// //

View File

@ -9,8 +9,8 @@ use DeviceNameError;
use Format; use Format;
use PauseStreamError; use PauseStreamError;
use PlayStreamError; use PlayStreamError;
use StreamDataResult;
use SupportedFormatsError; use SupportedFormatsError;
use StreamEvent;
use SupportedFormat; use SupportedFormat;
pub struct EventLoop; pub struct EventLoop;
@ -23,7 +23,7 @@ impl EventLoop {
#[inline] #[inline]
pub fn run<F>(&self, _callback: F) -> ! pub fn run<F>(&self, _callback: F) -> !
where F: FnMut(StreamId, StreamEvent) where F: FnMut(StreamId, StreamDataResult)
{ {
loop { /* TODO: don't spin */ } loop { /* TODO: don't spin */ }
} }

View File

@ -27,8 +27,8 @@ use PauseStreamError;
use PlayStreamError; use PlayStreamError;
use SampleFormat; use SampleFormat;
use StreamData; use StreamData;
use StreamDataResult;
use StreamError; use StreamError;
use StreamEvent;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
use UnknownTypeInputBuffer; use UnknownTypeInputBuffer;
@ -445,12 +445,12 @@ impl EventLoop {
#[inline] #[inline]
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamEvent) where F: FnMut(StreamId, StreamDataResult)
{ {
self.run_inner(&mut callback); self.run_inner(&mut callback);
} }
fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! {
unsafe { unsafe {
// We keep `run_context` locked forever, which guarantees that two invocations of // We keep `run_context` locked forever, which guarantees that two invocations of
// `run()` cannot run simultaneously. // `run()` cannot run simultaneously.
@ -472,8 +472,7 @@ impl EventLoop {
Some(p) => { Some(p) => {
run_context.handles.remove(p + 1); run_context.handles.remove(p + 1);
run_context.streams.remove(p); run_context.streams.remove(p);
let event = StreamEvent::Close(err.into()); callback(stream_id, Err(err.into()));
callback(stream_id, event);
}, },
} }
} }
@ -486,8 +485,7 @@ impl EventLoop {
Ok(idx) => idx, Ok(idx) => idx,
Err(err) => { Err(err) => {
for stream in &run_context.streams { for stream in &run_context.streams {
let event = StreamEvent::Close(err.clone().into()); callback(stream.id.clone(), Err(err.clone().into()));
callback(stream.id.clone(), event);
} }
run_context.streams.clear(); run_context.streams.clear();
run_context.handles.truncate(1); run_context.handles.truncate(1);
@ -553,8 +551,7 @@ impl EventLoop {
buffer: slice, buffer: slice,
}); });
let data = StreamData::Input { buffer: unknown_buffer }; let data = StreamData::Input { buffer: unknown_buffer };
let event = StreamEvent::Data(data); callback(stream.id.clone(), Ok(data));
callback(stream.id.clone(), event);
// Release the buffer. // Release the buffer.
let hresult = (*capture_client).ReleaseBuffer(frames_available); let hresult = (*capture_client).ReleaseBuffer(frames_available);
if let Err(err) = stream_error_from_hresult(hresult) { if let Err(err) = stream_error_from_hresult(hresult) {
@ -596,8 +593,7 @@ impl EventLoop {
buffer: slice buffer: slice
}); });
let data = StreamData::Output { buffer: unknown_buffer }; let data = StreamData::Output { buffer: unknown_buffer };
let event = StreamEvent::Data(data); callback(stream.id.clone(), Ok(data));
callback(stream.id.clone(), event);
let hresult = (*render_client) let hresult = (*render_client)
.ReleaseBuffer(frames_available as u32, 0); .ReleaseBuffer(frames_available as u32, 0);
if let Err(err) = stream_error_from_hresult(hresult) { if let Err(err) = stream_error_from_hresult(hresult) {
@ -739,7 +735,7 @@ fn format_to_waveformatextensible(format: &Format) -> Option<mmreg::WAVEFORMATEX
// Process any pending commands that are queued within the `RunContext`. // Process any pending commands that are queued within the `RunContext`.
fn process_commands( fn process_commands(
run_context: &mut RunContext, run_context: &mut RunContext,
callback: &mut dyn FnMut(StreamId, StreamEvent), callback: &mut dyn FnMut(StreamId, StreamDataResult),
) { ) {
// Process the pending commands. // Process the pending commands.
for command in run_context.commands.try_iter() { for command in run_context.commands.try_iter() {
@ -771,8 +767,7 @@ fn process_commands(
run_context.streams[p].playing = true; run_context.streams[p].playing = true;
} }
Err(err) => { Err(err) => {
let event = StreamEvent::Close(err.into()); callback(stream_id, Err(err.into()));
callback(stream_id, event);
run_context.handles.remove(p + 1); run_context.handles.remove(p + 1);
run_context.streams.remove(p); run_context.streams.remove(p);
} }
@ -794,8 +789,7 @@ fn process_commands(
run_context.streams[p].playing = false; run_context.streams[p].playing = false;
} }
Err(err) => { Err(err) => {
let event = StreamEvent::Close(err.into()); callback(stream_id, Err(err.into()));
callback(stream_id, event);
run_context.handles.remove(p + 1); run_context.handles.remove(p + 1);
run_context.streams.remove(p); run_context.streams.remove(p);
} }