Update alsa backend for addition of `StreamEvent` type

This commit significantly refactors the alsa backend's `EventLoop::run`
implementation in order to allow for better error handling throughout
the loop. This removes many cases that would previously `panic!` in
favour of calling the user callback with the necessary error and
removing the corrupt stream. Seeing as the method cannot return, a
catch-all `panic!` still exists at the end of the method, however this
refactor should make it much easier to remove this restriction in the
future.
This commit is contained in:
mitchmindtree 2019-06-22 00:06:55 +02:00
parent 59c789fbcd
commit e41baa248b
1 changed files with 276 additions and 150 deletions

View File

@ -14,7 +14,10 @@ use PlayStreamError;
use SupportedFormatsError; use SupportedFormatsError;
use SampleFormat; use SampleFormat;
use SampleRate; use SampleRate;
use StreamCloseCause;
use StreamData; use StreamData;
use StreamError;
use StreamEvent;
use SupportedFormat; use SupportedFormat;
use UnknownTypeInputBuffer; use UnknownTypeInputBuffer;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
@ -353,7 +356,7 @@ pub struct EventLoop {
// A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so // A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so
// that `poll()` can wake up and pick the changes. // that `poll()` can wake up and pick the changes.
pending_trigger: Trigger, pending_command_trigger: Trigger,
// This field is locked by the `run()` method. // This field is locked by the `run()` method.
// The mutex also ensures that only one thread at a time has `run()` running. // The mutex also ensures that only one thread at a time has `run()` running.
@ -377,7 +380,7 @@ enum Command {
} }
struct RunContext { struct RunContext {
// Descriptors to wait for. Always contains `pending_trigger.read_fd()` as first element. // Descriptors to wait for. Always contains `pending_command_trigger.read_fd()` as first element.
descriptors: Vec<libc::pollfd>, descriptors: Vec<libc::pollfd>,
// List of streams that are written in `descriptors`. // List of streams that are written in `descriptors`.
streams: Vec<StreamInner>, streams: Vec<StreamInner>,
@ -421,24 +424,25 @@ struct StreamInner {
// Lazily allocated buffer that is reused inside the loop. // Lazily allocated buffer that is reused inside the loop.
// Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is // Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is
// used. // used.
buffer: Option<Vec<u8>>, buffer: Vec<u8>,
} }
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamId(usize); pub struct StreamId(usize);
enum StreamType { Input, Output }
impl EventLoop { impl EventLoop {
#[inline] #[inline]
pub fn new() -> EventLoop { pub fn new() -> EventLoop {
let pending_trigger = Trigger::new(); let pending_command_trigger = Trigger::new();
let initial_descriptors = vec![ let mut initial_descriptors = vec![];
libc::pollfd { reset_descriptors_with_pending_command_trigger(
fd: pending_trigger.read_fd(), &mut initial_descriptors,
events: libc::POLLIN, &pending_command_trigger,
revents: 0, );
},
];
let (tx, rx) = channel(); let (tx, rx) = channel();
@ -450,7 +454,7 @@ impl EventLoop {
EventLoop { EventLoop {
next_stream_id: AtomicUsize::new(0), next_stream_id: AtomicUsize::new(0),
pending_trigger: pending_trigger, pending_command_trigger: pending_command_trigger,
run_context, run_context,
commands: tx, commands: tx,
} }
@ -458,220 +462,191 @@ impl EventLoop {
#[inline] #[inline]
pub fn run<F>(&self, mut callback: F) -> ! pub fn run<F>(&self, mut callback: F) -> !
where F: FnMut(StreamId, StreamData) where F: FnMut(StreamId, StreamEvent)
{ {
self.run_inner(&mut callback) self.run_inner(&mut callback)
} }
fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! { fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! {
unsafe { unsafe {
let mut run_context = self.run_context.lock().unwrap(); let mut run_context = self.run_context.lock().unwrap();
let run_context = &mut *run_context; let run_context = &mut *run_context;
loop { 'stream_loop: loop {
{ process_commands(run_context, callback);
for command in run_context.commands.try_iter() {
match command { reset_descriptors_with_pending_command_trigger(
Command::DestroyStream(stream_id) => { &mut run_context.descriptors,
run_context.streams.retain(|s| s.id != stream_id); &self.pending_command_trigger,
}, );
Command::PlayStream(stream_id) => { append_stream_poll_descriptors(run_context);
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id) // At this point, this should include the command `pending_commands_trigger` along
{ // with the poll descriptors for each stream.
alsa::snd_pcm_pause(stream.channel, 0); match poll_all_descriptors(&mut run_context.descriptors) {
stream.is_paused = false; Ok(true) => (),
} Ok(false) => continue,
}, Err(err) => {
Command::PauseStream(stream_id) => { for stream in run_context.streams.iter() {
if let Some(stream) = run_context.streams.iter_mut() let event = StreamEvent::Close(err.clone().into());
.find(|stream| stream.can_pause && stream.id == stream_id) callback(stream.id, event);
{
alsa::snd_pcm_pause(stream.channel, 1);
stream.is_paused = true;
}
},
Command::NewStream(stream_inner) => {
run_context.streams.push(stream_inner);
},
} }
} run_context.streams.clear();
break 'stream_loop;
run_context.descriptors = vec![
libc::pollfd {
fd: self.pending_trigger.read_fd(),
events: libc::POLLIN,
revents: 0,
},
];
for stream in run_context.streams.iter() {
run_context.descriptors.reserve(stream.num_descriptors);
let len = run_context.descriptors.len();
let filled = alsa::snd_pcm_poll_descriptors(stream.channel,
run_context
.descriptors
.as_mut_ptr()
.offset(len as isize),
stream.num_descriptors as
libc::c_uint);
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
run_context.descriptors.set_len(len + stream.num_descriptors);
} }
} }
let ret = libc::poll(run_context.descriptors.as_mut_ptr(), // If the `pending_command_trigger` was signaled, we need to process the comands.
run_context.descriptors.len() as libc::nfds_t,
-1 /* infinite */);
assert!(ret >= 0, "poll() failed");
if ret == 0 {
continue;
}
// If the `pending_trigger` was signaled, we need to process the comands.
if run_context.descriptors[0].revents != 0 { if run_context.descriptors[0].revents != 0 {
run_context.descriptors[0].revents = 0; run_context.descriptors[0].revents = 0;
self.pending_trigger.clear_pipe(); self.pending_command_trigger.clear_pipe();
} }
// The set of streams that error within the following loop and should be removed.
let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![];
// Iterate over each individual stream/descriptor. // Iterate over each individual stream/descriptor.
let mut i_stream = 0; let mut i_stream = 0;
let mut i_descriptor = 1; let mut i_descriptor = 1;
while (i_descriptor as usize) < run_context.descriptors.len() { while (i_descriptor as usize) < run_context.descriptors.len() {
enum StreamType { Input, Output } let stream = &mut run_context.streams[i_stream];
let stream_type; let stream_descriptor_ptr = run_context.descriptors.as_mut_ptr().offset(i_descriptor);
let stream_inner = run_context.streams.get_mut(i_stream).unwrap();
// Check whether the event is `POLLOUT` or `POLLIN`. If neither, `continue`. // Only go on if this event was a pollout or pollin event.
{ let stream_type = match check_for_pollout_or_pollin(stream, stream_descriptor_ptr) {
let mut revent = mem::uninitialized(); Ok(Some(ty)) => ty,
Ok(None) => {
{ i_descriptor += stream.num_descriptors as isize;
let num_descriptors = stream_inner.num_descriptors as libc::c_uint; i_stream += 1;
let desc_ptr = continue;
run_context.descriptors.as_mut_ptr().offset(i_descriptor); },
let res = alsa::snd_pcm_poll_descriptors_revents(stream_inner.channel, Err(err) => {
desc_ptr, streams_to_remove.push((stream.id, err.into()));
num_descriptors, i_descriptor += stream.num_descriptors as isize;
&mut revent);
check_errors(res).unwrap();
}
if revent as i16 == libc::POLLOUT {
stream_type = StreamType::Output;
} else if revent as i16 == libc::POLLIN {
stream_type = StreamType::Input;
} else {
i_descriptor += stream_inner.num_descriptors as isize;
i_stream += 1; i_stream += 1;
continue; continue;
} }
} };
// Determine the number of samples that are available to read/write. // Get the number of available samples for reading/writing.
let available_samples = { let available_samples = match get_available_samples(stream) {
let available = alsa::snd_pcm_avail(stream_inner.channel); // TODO: what about snd_pcm_avail_update? Ok(n) => n,
Err(err) => {
if available == -32 { streams_to_remove.push((stream.id, err.into()));
// buffer underrun i_descriptor += stream.num_descriptors as isize;
stream_inner.buffer_len i_stream += 1;
} else if available < 0 { continue;
check_errors(available as libc::c_int)
.expect("buffer is not available");
unreachable!()
} else {
(available * stream_inner.num_channels as alsa::snd_pcm_sframes_t) as
usize
} }
}; };
if available_samples < stream_inner.period_len { // Only go on if there is at least `stream.period_len` samples.
i_descriptor += stream_inner.num_descriptors as isize; if available_samples < stream.period_len {
i_descriptor += stream.num_descriptors as isize;
i_stream += 1; i_stream += 1;
continue; continue;
} }
let stream_id = stream_inner.id.clone(); // Prepare the data buffer.
let buffer_size = stream.sample_format.sample_size() * available_samples;
let available_frames = available_samples / stream_inner.num_channels as usize; stream.buffer.resize(buffer_size, 0u8);
let available_frames = available_samples / stream.num_channels as usize;
let buffer_size = stream_inner.sample_format.sample_size() * available_samples;
// Could be written with a match with improved borrow checking
if stream_inner.buffer.is_none() {
stream_inner.buffer = Some(vec![0u8; buffer_size]);
} else {
stream_inner.buffer.as_mut().unwrap().resize(buffer_size, 0u8);
}
let buffer = stream_inner.buffer.as_mut().unwrap();
match stream_type { match stream_type {
StreamType::Input => { StreamType::Input => {
let err = alsa::snd_pcm_readi( let result = alsa::snd_pcm_readi(
stream_inner.channel, stream.channel,
buffer.as_mut_ptr() as *mut _, stream.buffer.as_mut_ptr() as *mut _,
available_frames as alsa::snd_pcm_uframes_t, available_frames as alsa::snd_pcm_uframes_t,
); );
check_errors(err as _).expect("snd_pcm_readi error"); if let Err(err) = check_errors(result as _) {
let description = format!("`snd_pcm_readi` failed: {}", err);
let err = BackendSpecificError { description };
streams_to_remove.push((stream.id, err.into()));
continue;
}
let input_buffer = match stream_inner.sample_format { let input_buffer = match stream.sample_format {
SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer { SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer {
buffer: cast_input_buffer(buffer), buffer: cast_input_buffer(&mut stream.buffer),
}), }),
SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer { SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer {
buffer: cast_input_buffer(buffer), buffer: cast_input_buffer(&mut stream.buffer),
}), }),
SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer { SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer {
buffer: cast_input_buffer(buffer), buffer: cast_input_buffer(&mut stream.buffer),
}), }),
}; };
let stream_data = StreamData::Input { let stream_data = StreamData::Input {
buffer: input_buffer, buffer: input_buffer,
}; };
callback(stream_id, stream_data); let event = StreamEvent::Data(stream_data);
callback(stream.id, event);
}, },
StreamType::Output => { StreamType::Output => {
{ {
// We're now sure that we're ready to write data. // We're now sure that we're ready to write data.
let output_buffer = match stream_inner.sample_format { let output_buffer = match stream.sample_format {
SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer { SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer {
buffer: cast_output_buffer(buffer), buffer: cast_output_buffer(&mut stream.buffer),
}), }),
SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer { SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer {
buffer: cast_output_buffer(buffer), buffer: cast_output_buffer(&mut stream.buffer),
}), }),
SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer { SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer {
buffer: cast_output_buffer(buffer), buffer: cast_output_buffer(&mut stream.buffer),
}), }),
}; };
let stream_data = StreamData::Output { let stream_data = StreamData::Output {
buffer: output_buffer, buffer: output_buffer,
}; };
callback(stream_id, stream_data); let event = StreamEvent::Data(stream_data);
callback(stream.id, event);
} }
loop { loop {
let result = alsa::snd_pcm_writei( let result = alsa::snd_pcm_writei(
stream_inner.channel, stream.channel,
buffer.as_ptr() as *const _, stream.buffer.as_ptr() as *const _,
available_frames as alsa::snd_pcm_uframes_t, available_frames as alsa::snd_pcm_uframes_t,
); );
if result == -32 { if result == -32 {
// buffer underrun // buffer underrun
alsa::snd_pcm_prepare(stream_inner.channel); // TODO: Notify the user of this.
} else if result < 0 { alsa::snd_pcm_prepare(stream.channel);
check_errors(result as libc::c_int) } else if let Err(err) = check_errors(result as _) {
.expect("could not write pcm"); let description = format!("`snd_pcm_writei` failed: {}", err);
let err = BackendSpecificError { description };
streams_to_remove.push((stream.id, err.into()));
continue;
} else if result as usize != available_frames {
let description = format!(
"unexpected number of frames written: expected {}, \
result {} (this should never happen)",
available_frames,
result,
);
let err = BackendSpecificError { description };
streams_to_remove.push((stream.id, err.into()));
continue;
} else { } else {
assert_eq!(result as usize, available_frames);
break; break;
} }
} }
}, },
} }
} }
// Remove any streams that have errored and notify the user.
for (stream_id, err) in streams_to_remove {
run_context.streams.retain(|s| s.id != stream_id);
let event = StreamEvent::Close(err.into());
callback(stream_id, event);
}
} }
} }
panic!("`cpal::EventLoop::run` API currently disallows returning");
} }
pub fn build_input_stream( pub fn build_input_stream(
@ -739,7 +714,7 @@ impl EventLoop {
can_pause: can_pause, can_pause: can_pause,
is_paused: false, is_paused: false,
resume_trigger: Trigger::new(), resume_trigger: Trigger::new(),
buffer: None, buffer: vec![],
}; };
if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) { if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) {
@ -818,7 +793,7 @@ impl EventLoop {
can_pause: can_pause, can_pause: can_pause,
is_paused: false, is_paused: false,
resume_trigger: Trigger::new(), resume_trigger: Trigger::new(),
buffer: None, buffer: vec![],
}; };
self.push_command(Command::NewStream(stream_inner)); self.push_command(Command::NewStream(stream_inner));
@ -830,7 +805,7 @@ impl EventLoop {
fn push_command(&self, command: Command) { fn push_command(&self, command: Command) {
// Safe to unwrap: sender outlives receiver. // Safe to unwrap: sender outlives receiver.
self.commands.send(command).unwrap(); self.commands.send(command).unwrap();
self.pending_trigger.wakeup(); self.pending_command_trigger.wakeup();
} }
#[inline] #[inline]
@ -851,6 +826,157 @@ impl EventLoop {
} }
} }
// Process any pending `Command`s within the `RunContext`'s queue.
fn process_commands(
run_context: &mut RunContext,
callback: &mut dyn FnMut(StreamId, StreamEvent),
) {
for command in run_context.commands.try_iter() {
match command {
Command::DestroyStream(stream_id) => {
run_context.streams.retain(|s| s.id != stream_id);
let event = StreamCloseCause::UserDestroyed.into();
callback(stream_id, event);
},
Command::PlayStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
callback(stream_id, StreamEvent::Play);
unsafe {
alsa::snd_pcm_pause(stream.channel, 0);
}
stream.is_paused = false;
}
},
Command::PauseStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
callback(stream_id, StreamEvent::Pause);
unsafe {
alsa::snd_pcm_pause(stream.channel, 1);
}
stream.is_paused = true;
}
},
Command::NewStream(stream_inner) => {
run_context.streams.push(stream_inner);
},
}
}
}
// Resets the descriptors so that only `pending_command_trigger.read_fd()` is contained.
fn reset_descriptors_with_pending_command_trigger(
descriptors: &mut Vec<libc::pollfd>,
pending_command_trigger: &Trigger,
) {
descriptors.clear();
descriptors.push(libc::pollfd {
fd: pending_command_trigger.read_fd(),
events: libc::POLLIN,
revents: 0,
});
}
// Appends the `poll` descriptors for each stream onto the `RunContext`'s descriptor slice, ready
// for a call to `libc::poll`.
fn append_stream_poll_descriptors(run_context: &mut RunContext) {
for stream in run_context.streams.iter() {
run_context.descriptors.reserve(stream.num_descriptors);
let len = run_context.descriptors.len();
let filled = unsafe {
alsa::snd_pcm_poll_descriptors(
stream.channel,
run_context.descriptors.as_mut_ptr().offset(len as isize),
stream.num_descriptors as libc::c_uint,
)
};
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
unsafe {
run_context.descriptors.set_len(len + stream.num_descriptors);
}
}
}
// Poll all descriptors within the given set.
//
// Returns `Ok(true)` if some event has occurred or `Ok(false)` if no events have
// occurred.
//
// Returns an `Err` if `libc::poll` returns a negative value for some reason.
fn poll_all_descriptors(descriptors: &mut [libc::pollfd]) -> Result<bool, BackendSpecificError> {
let res = unsafe {
// Don't timeout, wait forever.
libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1)
};
if res < 0 {
let description = format!("`libc::poll()` failed: {}", res);
Err(BackendSpecificError { description })
} else if res == 0 {
Ok(false)
} else {
Ok(true)
}
}
// Check whether the event is `POLLOUT` or `POLLIN`.
//
// If so, return the stream type associated with the event.
//
// Otherwise, returns `Ok(None)`.
//
// Returns an `Err` if the `snd_pcm_poll_descriptors_revents` call fails.
fn check_for_pollout_or_pollin(
stream: &StreamInner,
stream_descriptor_ptr: *mut libc::pollfd,
) -> Result<Option<StreamType>, BackendSpecificError> {
let (revent, res) = unsafe {
let mut revent = mem::uninitialized();
let res = alsa::snd_pcm_poll_descriptors_revents(
stream.channel,
stream_descriptor_ptr,
stream.num_descriptors as libc::c_uint,
&mut revent,
);
(revent, res)
};
if let Err(desc) = check_errors(res) {
let description =
format!("`snd_pcm_poll_descriptors_revents` failed: {}",desc);
let err = BackendSpecificError { description };
return Err(err);
}
if revent as i16 == libc::POLLOUT {
Ok(Some(StreamType::Output))
} else if revent as i16 == libc::POLLIN {
Ok(Some(StreamType::Input))
} else {
Ok(None)
}
}
// Determine the number of samples that are available to read/write.
fn get_available_samples(stream: &StreamInner) -> Result<usize, BackendSpecificError> {
// TODO: what about snd_pcm_avail_update?
let available = unsafe {
alsa::snd_pcm_avail(stream.channel)
};
if available == -32 {
// buffer underrun
// TODO: Notify the user some how.
Ok(stream.buffer_len)
} else if let Err(desc) = check_errors(available as libc::c_int) {
let description = format!("failed to get available samples: {}", desc);
let err = BackendSpecificError { description };
Err(err)
} else {
Ok((available * stream.num_channels as alsa::snd_pcm_sframes_t) as usize)
}
}
unsafe fn set_hw_params_from_format( unsafe fn set_hw_params_from_format(
pcm_handle: *mut alsa::snd_pcm_t, pcm_handle: *mut alsa::snd_pcm_t,
hw_params: &HwParams, hw_params: &HwParams,