Change callback interface so that it takes a dedicated error callback

This commit is contained in:
Tatsuyuki Ishi 2019-07-15 22:37:03 +09:00 committed by mitchmindtree
parent c97d1dd3fa
commit 3cce3e43d9
8 changed files with 50 additions and 71 deletions

View File

@ -17,15 +17,7 @@ fn main() -> Result<(), anyhow::Error> {
(sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin() (sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin()
}; };
let stream = device.build_output_stream(&format, move |result| { let stream = device.build_output_stream(&format, move |data| {
let data = match result {
Ok(data) => data,
Err(err) => {
eprintln!("an error occurred on stream: {}", err);
return;
}
};
match data { match data {
cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => { cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => {
for sample in buffer.chunks_mut(channels as usize) { for sample in buffer.chunks_mut(channels as usize) {
@ -53,6 +45,8 @@ fn main() -> Result<(), anyhow::Error> {
}, },
_ => (), _ => (),
} }
}, move |err| {
eprintln!("an error occurred on stream: {}", err);
})?; })?;
stream.play()?; stream.play()?;

View File

@ -49,15 +49,7 @@ fn main() -> Result<(), anyhow::Error> {
// Build streams. // Build streams.
println!("Attempting to build both streams with `{:?}`.", format); println!("Attempting to build both streams with `{:?}`.", format);
let input_stream = input_device.build_input_stream(&format, move |result| { let input_stream = input_device.build_input_stream(&format, move |data| {
let data = match result {
Ok(data) => data,
Err(err) => {
eprintln!("an error occurred on input stream: {}", err);
return;
},
};
match data { match data {
cpal::StreamData::Input { cpal::StreamData::Input {
buffer: cpal::UnknownTypeInputBuffer::F32(buffer), buffer: cpal::UnknownTypeInputBuffer::F32(buffer),
@ -74,15 +66,10 @@ fn main() -> Result<(), anyhow::Error> {
}, },
_ => panic!("Expected input with f32 data"), _ => panic!("Expected input with f32 data"),
} }
}, move |err| {
eprintln!("an error occurred on input stream: {}", err);
})?; })?;
let output_stream = output_device.build_output_stream(&format, move |result| { let output_stream = output_device.build_output_stream(&format, move |data| {
let data = match result {
Ok(data) => data,
Err(err) => {
eprintln!("an error occurred on output stream: {}", err);
return;
},
};
match data { match data {
cpal::StreamData::Output { cpal::StreamData::Output {
buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer), buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer),
@ -103,6 +90,8 @@ fn main() -> Result<(), anyhow::Error> {
}, },
_ => panic!("Expected output with f32 data"), _ => panic!("Expected output with f32 data"),
} }
}, move |err| {
eprintln!("an error occurred on output stream: {}", err);
})?; })?;
println!("Successfully built streams."); println!("Successfully built streams.");

View File

@ -32,15 +32,7 @@ fn main() -> Result<(), anyhow::Error> {
// Run the input stream on a separate thread. // Run the input stream on a separate thread.
let writer_2 = writer.clone(); let writer_2 = writer.clone();
let stream = device.build_input_stream(&format, move |event| { let stream = device.build_input_stream(&format, move |data| {
let data = match event {
Ok(data) => data,
Err(err) => {
eprintln!("an error occurred on stream: {}", err);
return;
},
};
// Otherwise write to the wav writer. // Otherwise write to the wav writer.
match data { match data {
cpal::StreamData::Input { cpal::StreamData::Input {
@ -79,6 +71,8 @@ fn main() -> Result<(), anyhow::Error> {
}, },
_ => (), _ => (),
} }
}, move |err| {
eprintln!("an error occurred on stream: {}", err);
})?; })?;
stream.play()?; stream.play()?;

View File

@ -18,7 +18,7 @@ use PlayStreamError;
use SampleFormat; use SampleFormat;
use SampleRate; use SampleRate;
use StreamData; use StreamData;
use StreamDataResult; use StreamError;
use SupportedFormat; use SupportedFormat;
use SupportedFormatsError; use SupportedFormatsError;
use traits::{DeviceTrait, HostTrait, StreamTrait}; use traits::{DeviceTrait, HostTrait, StreamTrait};
@ -89,12 +89,12 @@ impl DeviceTrait for Device {
Device::default_output_format(self) Device::default_output_format(self)
} }
fn build_input_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> where F: FnMut(StreamDataResult) + Send + 'static { fn build_input_stream<D, E>(&self, format: &Format, data_callback: D, error_callback: E) -> Result<Self::Stream, BuildStreamError> where D: FnMut(StreamData) + Send + 'static, E: FnMut(StreamError) + Send + 'static {
Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_CAPTURE)?), callback)) Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_CAPTURE)?), data_callback, error_callback))
} }
fn build_output_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> where F: FnMut(StreamDataResult) + Send + 'static { fn build_output_stream<D, E>(&self, format: &Format, data_callback: D, error_callback: E) -> Result<Self::Stream, BuildStreamError> where D: FnMut(StreamData) + Send + 'static, E: FnMut(StreamError) + Send + 'static {
Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_PLAYBACK)?), callback)) Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_PLAYBACK)?), data_callback, error_callback))
} }
} }
@ -526,7 +526,10 @@ pub struct Stream {
/// The inner body of the audio processing thread. Takes the polymorphic /// The inner body of the audio processing thread. Takes the polymorphic
/// callback to avoid generating too much generic code. /// callback to avoid generating too much generic code.
fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn FnMut(StreamDataResult) + Send + 'static)) { fn stream_worker(rx: TriggerReceiver,
stream: &StreamInner,
data_callback: &mut (dyn FnMut(StreamData) + Send + 'static),
error_callback: &mut (dyn FnMut(StreamError) + Send + 'static)) {
let mut descriptors = Vec::new(); let mut descriptors = Vec::new();
let mut buffer = Vec::new(); let mut buffer = Vec::new();
loop { loop {
@ -559,11 +562,11 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
}; };
if res < 0 { if res < 0 {
let description = format!("`libc::poll()` failed: {}", io::Error::last_os_error()); let description = format!("`libc::poll()` failed: {}", io::Error::last_os_error());
callback(Err(BackendSpecificError { description }.into())); error_callback(BackendSpecificError { description }.into());
continue; continue;
} else if res == 0 { } else if res == 0 {
let description = String::from("`libc::poll()` spuriously returned"); let description = String::from("`libc::poll()` spuriously returned");
callback(Err(BackendSpecificError { description }.into())); error_callback(BackendSpecificError { description }.into());
continue; continue;
} }
@ -589,7 +592,7 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
Ok(n) => n, Ok(n) => n,
Err(err) => { Err(err) => {
let description = format!("Failed to query the number of available samples: {}", err); let description = format!("Failed to query the number of available samples: {}", err);
callback(Err(BackendSpecificError { description }.into())); error_callback(BackendSpecificError { description }.into());
continue; continue;
} }
}; };
@ -615,7 +618,7 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
}; };
if let Err(err) = check_errors(result as _) { if let Err(err) = check_errors(result as _) {
let description = format!("`snd_pcm_readi` failed: {}", err); let description = format!("`snd_pcm_readi` failed: {}", err);
callback(Err(BackendSpecificError { description }.into())); error_callback(BackendSpecificError { description }.into());
continue; continue;
} }
@ -633,7 +636,7 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
let stream_data = StreamData::Input { let stream_data = StreamData::Input {
buffer: input_buffer, buffer: input_buffer,
}; };
callback(Ok(stream_data)); data_callback(stream_data);
}, },
StreamType::Output => { StreamType::Output => {
{ {
@ -653,7 +656,7 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
let stream_data = StreamData::Output { let stream_data = StreamData::Output {
buffer: output_buffer, buffer: output_buffer,
}; };
callback(Ok(stream_data)); data_callback(stream_data);
} }
loop { loop {
let result = unsafe { let result = unsafe {
@ -670,7 +673,7 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
unsafe { alsa::snd_pcm_recover(stream.channel, result as i32, 0) }; unsafe { alsa::snd_pcm_recover(stream.channel, result as i32, 0) };
} else if let Err(err) = check_errors(result as _) { } else if let Err(err) = check_errors(result as _) {
let description = format!("`snd_pcm_writei` failed: {}", err); let description = format!("`snd_pcm_writei` failed: {}", err);
callback(Err(BackendSpecificError { description }.into())); error_callback(BackendSpecificError { description }.into());
continue; continue;
} else if result as usize != available_frames { } else if result as usize != available_frames {
let description = format!( let description = format!(
@ -679,7 +682,7 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
available_frames, available_frames,
result, result,
); );
callback(Err(BackendSpecificError { description }.into())); error_callback(BackendSpecificError { description }.into());
continue; continue;
} else { } else {
break; break;
@ -691,12 +694,13 @@ fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn
} }
impl Stream { impl Stream {
fn new<F>(inner: Arc<StreamInner>, mut callback: F) -> Stream where F: FnMut(StreamDataResult) + Send + 'static { fn new<D, E>(inner: Arc<StreamInner>, mut data_callback: D, mut error_callback: E) -> Stream
where D: FnMut(StreamData) + Send + 'static, E: FnMut(StreamError) + Send + 'static {
let (tx, rx) = trigger(); let (tx, rx) = trigger();
// Clone the handle for passing into worker thread. // Clone the handle for passing into worker thread.
let stream = inner.clone(); let stream = inner.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
stream_worker(rx, &*stream, &mut callback); stream_worker(rx, &*stream, &mut data_callback, &mut error_callback);
}); });
Stream { Stream {
thread: Some(thread), thread: Some(thread),

View File

@ -7,7 +7,8 @@ use DeviceNameError;
use Format; use Format;
use PauseStreamError; use PauseStreamError;
use PlayStreamError; use PlayStreamError;
use StreamDataResult; use StreamData;
use StreamError;
use SupportedFormatsError; use SupportedFormatsError;
use SupportedFormat; use SupportedFormat;
use traits::{DeviceTrait, HostTrait, StreamTrait}; use traits::{DeviceTrait, HostTrait, StreamTrait};
@ -76,14 +77,14 @@ impl DeviceTrait for Device {
unimplemented!() unimplemented!()
} }
fn build_input_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> fn build_input_stream<D, E>(&self, format: &Format, data_callback: D, error_callback: E) -> Result<Self::Stream, BuildStreamError>
where F: FnMut(StreamDataResult) + Send + 'static { where D: FnMut(StreamData) + Send + 'static, E: FnMut(StreamError) + Send + 'static {
unimplemented!() unimplemented!()
} }
/// Create an output stream. /// Create an output stream.
fn build_output_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> fn build_output_stream<D, E>(&self, format: &Format, data_callback: D, error_callback: E) -> Result<Self::Stream, BuildStreamError>
where F: FnMut(StreamDataResult) + Send + 'static{ where D: FnMut(StreamData) + Send + 'static, E: FnMut(StreamError) + Send + 'static{
unimplemented!() unimplemented!()
} }
} }

View File

@ -206,10 +206,6 @@ pub enum StreamData<'a> {
}, },
} }
/// Stream data passed to the `EventLoop::run` callback, or an error in the case that the device
/// was invalidated or some backend-specific error occurred.
pub type StreamDataResult<'a> = Result<StreamData<'a>, StreamError>;
/// Represents a buffer containing audio data that may be read. /// Represents a buffer containing audio data that may be read.
/// ///
/// This struct implements the `Deref` trait targeting `[T]`. Therefore this buffer can be read the /// This struct implements the `Deref` trait targeting `[T]`. Therefore this buffer can be read the

View File

@ -250,22 +250,22 @@ macro_rules! impl_platform_host {
} }
} }
fn build_input_stream<F>(&self, format: &crate::Format, callback: F) -> Result<Self::Stream, crate::BuildStreamError> fn build_input_stream<D, E>(&self, format: &crate::Format, data_callback: D, error_callback: E) -> Result<Self::Stream, crate::BuildStreamError>
where F: FnMut(crate::StreamDataResult) + Send + 'static { where D: FnMut(crate::StreamData) + Send + 'static, E: FnMut(crate::StreamError) + Send + 'static {
match self.0 { match self.0 {
$( $(
DeviceInner::$HostVariant(ref d) => d.build_input_stream(format, callback) DeviceInner::$HostVariant(ref d) => d.build_input_stream(format, data_callback, error_callback)
.map(StreamInner::$HostVariant) .map(StreamInner::$HostVariant)
.map(Stream), .map(Stream),
)* )*
} }
} }
fn build_output_stream<F>(&self, format: &crate::Format, callback: F) -> Result<Self::Stream, crate::BuildStreamError> fn build_output_stream<D, E>(&self, format: &crate::Format, data_callback: D, error_callback: E) -> Result<Self::Stream, crate::BuildStreamError>
where F: FnMut(crate::StreamDataResult) + Send + 'static { where D: FnMut(crate::StreamData) + Send + 'static, E: FnMut(crate::StreamError) + Send + 'static {
match self.0 { match self.0 {
$( $(
DeviceInner::$HostVariant(ref d) => d.build_output_stream(format, callback) DeviceInner::$HostVariant(ref d) => d.build_output_stream(format, data_callback, error_callback)
.map(StreamInner::$HostVariant) .map(StreamInner::$HostVariant)
.map(Stream), .map(Stream),
)* )*

View File

@ -10,7 +10,8 @@ use {
OutputDevices, OutputDevices,
PauseStreamError, PauseStreamError,
PlayStreamError, PlayStreamError,
StreamDataResult, StreamData,
StreamError,
SupportedFormat, SupportedFormat,
SupportedFormatsError, SupportedFormatsError,
}; };
@ -117,12 +118,12 @@ pub trait DeviceTrait {
fn default_output_format(&self) -> Result<Format, DefaultFormatError>; fn default_output_format(&self) -> Result<Format, DefaultFormatError>;
/// Create an input stream. /// Create an input stream.
fn build_input_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> fn build_input_stream<D, E>(&self, format: &Format, data_callback: D, error_callback: E) -> Result<Self::Stream, BuildStreamError>
where F: FnMut(StreamDataResult) + Send + 'static; where D: FnMut(StreamData) + Send + 'static, E: FnMut(StreamError) + Send + 'static;
/// Create an output stream. /// Create an output stream.
fn build_output_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> fn build_output_stream<D, E>(&self, format: &Format, data_callback: D, error_callback: E) -> Result<Self::Stream, BuildStreamError>
where F: FnMut(StreamDataResult) + Send + 'static; where D: FnMut(StreamData) + Send + 'static, E: FnMut(StreamError) + Send + 'static;
} }
/// A stream created from `Device`, with methods to control playback. /// A stream created from `Device`, with methods to control playback.