diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 43a3191..6c4a276 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -14,7 +14,10 @@ use PlayStreamError; use SupportedFormatsError; use SampleFormat; use SampleRate; +use StreamCloseCause; use StreamData; +use StreamError; +use StreamEvent; use SupportedFormat; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; @@ -353,7 +356,7 @@ pub struct EventLoop { // A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so // that `poll()` can wake up and pick the changes. - pending_trigger: Trigger, + pending_command_trigger: Trigger, // This field is locked by the `run()` method. // The mutex also ensures that only one thread at a time has `run()` running. @@ -377,7 +380,7 @@ enum Command { } struct RunContext { - // Descriptors to wait for. Always contains `pending_trigger.read_fd()` as first element. + // Descriptors to wait for. Always contains `pending_command_trigger.read_fd()` as first element. descriptors: Vec, // List of streams that are written in `descriptors`. streams: Vec, @@ -421,24 +424,25 @@ struct StreamInner { // Lazily allocated buffer that is reused inside the loop. // Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is // used. - buffer: Option>, + buffer: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamId(usize); +enum StreamType { Input, Output } + + impl EventLoop { #[inline] pub fn new() -> EventLoop { - let pending_trigger = Trigger::new(); + let pending_command_trigger = Trigger::new(); - let initial_descriptors = vec![ - libc::pollfd { - fd: pending_trigger.read_fd(), - events: libc::POLLIN, - revents: 0, - }, - ]; + let mut initial_descriptors = vec![]; + reset_descriptors_with_pending_command_trigger( + &mut initial_descriptors, + &pending_command_trigger, + ); let (tx, rx) = channel(); @@ -450,7 +454,7 @@ impl EventLoop { EventLoop { next_stream_id: AtomicUsize::new(0), - pending_trigger: pending_trigger, + pending_command_trigger: pending_command_trigger, run_context, commands: tx, } @@ -458,220 +462,191 @@ impl EventLoop { #[inline] pub fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamData) + where F: FnMut(StreamId, StreamEvent) { self.run_inner(&mut callback) } - fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! { + fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamEvent)) -> ! { unsafe { let mut run_context = self.run_context.lock().unwrap(); let run_context = &mut *run_context; - loop { - { - for command in run_context.commands.try_iter() { - match command { - Command::DestroyStream(stream_id) => { - run_context.streams.retain(|s| s.id != stream_id); - }, - Command::PlayStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - alsa::snd_pcm_pause(stream.channel, 0); - stream.is_paused = false; - } - }, - Command::PauseStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - alsa::snd_pcm_pause(stream.channel, 1); - stream.is_paused = true; - } - }, - Command::NewStream(stream_inner) => { - run_context.streams.push(stream_inner); - }, + 'stream_loop: loop { + process_commands(run_context, callback); + + reset_descriptors_with_pending_command_trigger( + &mut run_context.descriptors, + &self.pending_command_trigger, + ); + append_stream_poll_descriptors(run_context); + + // At this point, this should include the command `pending_commands_trigger` along + // with the poll descriptors for each stream. + match poll_all_descriptors(&mut run_context.descriptors) { + Ok(true) => (), + Ok(false) => continue, + Err(err) => { + for stream in run_context.streams.iter() { + let event = StreamEvent::Close(err.clone().into()); + callback(stream.id, event); } - } - - run_context.descriptors = vec![ - libc::pollfd { - fd: self.pending_trigger.read_fd(), - events: libc::POLLIN, - revents: 0, - }, - ]; - for stream in run_context.streams.iter() { - run_context.descriptors.reserve(stream.num_descriptors); - let len = run_context.descriptors.len(); - let filled = alsa::snd_pcm_poll_descriptors(stream.channel, - run_context - .descriptors - .as_mut_ptr() - .offset(len as isize), - stream.num_descriptors as - libc::c_uint); - debug_assert_eq!(filled, stream.num_descriptors as libc::c_int); - run_context.descriptors.set_len(len + stream.num_descriptors); + run_context.streams.clear(); + break 'stream_loop; } } - let ret = libc::poll(run_context.descriptors.as_mut_ptr(), - run_context.descriptors.len() as libc::nfds_t, - -1 /* infinite */); - assert!(ret >= 0, "poll() failed"); - - if ret == 0 { - continue; - } - - // If the `pending_trigger` was signaled, we need to process the comands. + // If the `pending_command_trigger` was signaled, we need to process the comands. if run_context.descriptors[0].revents != 0 { run_context.descriptors[0].revents = 0; - self.pending_trigger.clear_pipe(); + self.pending_command_trigger.clear_pipe(); } + // The set of streams that error within the following loop and should be removed. + let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; + // Iterate over each individual stream/descriptor. let mut i_stream = 0; let mut i_descriptor = 1; while (i_descriptor as usize) < run_context.descriptors.len() { - enum StreamType { Input, Output } - let stream_type; - let stream_inner = run_context.streams.get_mut(i_stream).unwrap(); + let stream = &mut run_context.streams[i_stream]; + let stream_descriptor_ptr = run_context.descriptors.as_mut_ptr().offset(i_descriptor); - // Check whether the event is `POLLOUT` or `POLLIN`. If neither, `continue`. - { - let mut revent = mem::uninitialized(); - - { - let num_descriptors = stream_inner.num_descriptors as libc::c_uint; - let desc_ptr = - run_context.descriptors.as_mut_ptr().offset(i_descriptor); - let res = alsa::snd_pcm_poll_descriptors_revents(stream_inner.channel, - desc_ptr, - num_descriptors, - &mut revent); - check_errors(res).unwrap(); - } - - if revent as i16 == libc::POLLOUT { - stream_type = StreamType::Output; - } else if revent as i16 == libc::POLLIN { - stream_type = StreamType::Input; - } else { - i_descriptor += stream_inner.num_descriptors as isize; + // Only go on if this event was a pollout or pollin event. + let stream_type = match check_for_pollout_or_pollin(stream, stream_descriptor_ptr) { + Ok(Some(ty)) => ty, + Ok(None) => { + i_descriptor += stream.num_descriptors as isize; + i_stream += 1; + continue; + }, + Err(err) => { + streams_to_remove.push((stream.id, err.into())); + i_descriptor += stream.num_descriptors as isize; i_stream += 1; continue; } - } + }; - // Determine the number of samples that are available to read/write. - let available_samples = { - let available = alsa::snd_pcm_avail(stream_inner.channel); // TODO: what about snd_pcm_avail_update? - - if available == -32 { - // buffer underrun - stream_inner.buffer_len - } else if available < 0 { - check_errors(available as libc::c_int) - .expect("buffer is not available"); - unreachable!() - } else { - (available * stream_inner.num_channels as alsa::snd_pcm_sframes_t) as - usize + // Get the number of available samples for reading/writing. + let available_samples = match get_available_samples(stream) { + Ok(n) => n, + Err(err) => { + streams_to_remove.push((stream.id, err.into())); + i_descriptor += stream.num_descriptors as isize; + i_stream += 1; + continue; } }; - if available_samples < stream_inner.period_len { - i_descriptor += stream_inner.num_descriptors as isize; + // Only go on if there is at least `stream.period_len` samples. + if available_samples < stream.period_len { + i_descriptor += stream.num_descriptors as isize; i_stream += 1; continue; } - let stream_id = stream_inner.id.clone(); - - let available_frames = available_samples / stream_inner.num_channels as usize; - - let buffer_size = stream_inner.sample_format.sample_size() * available_samples; - // Could be written with a match with improved borrow checking - if stream_inner.buffer.is_none() { - stream_inner.buffer = Some(vec![0u8; buffer_size]); - } else { - stream_inner.buffer.as_mut().unwrap().resize(buffer_size, 0u8); - } - let buffer = stream_inner.buffer.as_mut().unwrap(); + // Prepare the data buffer. + let buffer_size = stream.sample_format.sample_size() * available_samples; + stream.buffer.resize(buffer_size, 0u8); + let available_frames = available_samples / stream.num_channels as usize; match stream_type { StreamType::Input => { - let err = alsa::snd_pcm_readi( - stream_inner.channel, - buffer.as_mut_ptr() as *mut _, + let result = alsa::snd_pcm_readi( + stream.channel, + stream.buffer.as_mut_ptr() as *mut _, available_frames as alsa::snd_pcm_uframes_t, ); - check_errors(err as _).expect("snd_pcm_readi error"); + if let Err(err) = check_errors(result as _) { + let description = format!("`snd_pcm_readi` failed: {}", err); + let err = BackendSpecificError { description }; + streams_to_remove.push((stream.id, err.into())); + continue; + } - let input_buffer = match stream_inner.sample_format { + let input_buffer = match stream.sample_format { SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer { - buffer: cast_input_buffer(buffer), + buffer: cast_input_buffer(&mut stream.buffer), }), SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer { - buffer: cast_input_buffer(buffer), + buffer: cast_input_buffer(&mut stream.buffer), }), SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer { - buffer: cast_input_buffer(buffer), + buffer: cast_input_buffer(&mut stream.buffer), }), }; let stream_data = StreamData::Input { buffer: input_buffer, }; - callback(stream_id, stream_data); + let event = StreamEvent::Data(stream_data); + callback(stream.id, event); }, StreamType::Output => { { // We're now sure that we're ready to write data. - let output_buffer = match stream_inner.sample_format { + let output_buffer = match stream.sample_format { SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer { - buffer: cast_output_buffer(buffer), + buffer: cast_output_buffer(&mut stream.buffer), }), SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer { - buffer: cast_output_buffer(buffer), + buffer: cast_output_buffer(&mut stream.buffer), }), SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer { - buffer: cast_output_buffer(buffer), + buffer: cast_output_buffer(&mut stream.buffer), }), }; let stream_data = StreamData::Output { buffer: output_buffer, }; - callback(stream_id, stream_data); + let event = StreamEvent::Data(stream_data); + callback(stream.id, event); } loop { let result = alsa::snd_pcm_writei( - stream_inner.channel, - buffer.as_ptr() as *const _, + stream.channel, + stream.buffer.as_ptr() as *const _, available_frames as alsa::snd_pcm_uframes_t, ); if result == -32 { // buffer underrun - alsa::snd_pcm_prepare(stream_inner.channel); - } else if result < 0 { - check_errors(result as libc::c_int) - .expect("could not write pcm"); + // TODO: Notify the user of this. + alsa::snd_pcm_prepare(stream.channel); + } else if let Err(err) = check_errors(result as _) { + let description = format!("`snd_pcm_writei` failed: {}", err); + let err = BackendSpecificError { description }; + streams_to_remove.push((stream.id, err.into())); + continue; + } else if result as usize != available_frames { + let description = format!( + "unexpected number of frames written: expected {}, \ + result {} (this should never happen)", + available_frames, + result, + ); + let err = BackendSpecificError { description }; + streams_to_remove.push((stream.id, err.into())); + continue; } else { - assert_eq!(result as usize, available_frames); break; } } }, } } + + // Remove any streams that have errored and notify the user. + for (stream_id, err) in streams_to_remove { + run_context.streams.retain(|s| s.id != stream_id); + let event = StreamEvent::Close(err.into()); + callback(stream_id, event); + } } } + + panic!("`cpal::EventLoop::run` API currently disallows returning"); } pub fn build_input_stream( @@ -739,7 +714,7 @@ impl EventLoop { can_pause: can_pause, is_paused: false, resume_trigger: Trigger::new(), - buffer: None, + buffer: vec![], }; if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) { @@ -818,7 +793,7 @@ impl EventLoop { can_pause: can_pause, is_paused: false, resume_trigger: Trigger::new(), - buffer: None, + buffer: vec![], }; self.push_command(Command::NewStream(stream_inner)); @@ -830,7 +805,7 @@ impl EventLoop { fn push_command(&self, command: Command) { // Safe to unwrap: sender outlives receiver. self.commands.send(command).unwrap(); - self.pending_trigger.wakeup(); + self.pending_command_trigger.wakeup(); } #[inline] @@ -851,6 +826,157 @@ impl EventLoop { } } +// Process any pending `Command`s within the `RunContext`'s queue. +fn process_commands( + run_context: &mut RunContext, + callback: &mut dyn FnMut(StreamId, StreamEvent), +) { + for command in run_context.commands.try_iter() { + match command { + Command::DestroyStream(stream_id) => { + run_context.streams.retain(|s| s.id != stream_id); + let event = StreamCloseCause::UserDestroyed.into(); + callback(stream_id, event); + }, + Command::PlayStream(stream_id) => { + if let Some(stream) = run_context.streams.iter_mut() + .find(|stream| stream.can_pause && stream.id == stream_id) + { + callback(stream_id, StreamEvent::Play); + unsafe { + alsa::snd_pcm_pause(stream.channel, 0); + } + stream.is_paused = false; + } + }, + Command::PauseStream(stream_id) => { + if let Some(stream) = run_context.streams.iter_mut() + .find(|stream| stream.can_pause && stream.id == stream_id) + { + callback(stream_id, StreamEvent::Pause); + unsafe { + alsa::snd_pcm_pause(stream.channel, 1); + } + stream.is_paused = true; + } + }, + Command::NewStream(stream_inner) => { + run_context.streams.push(stream_inner); + }, + } + } +} + +// Resets the descriptors so that only `pending_command_trigger.read_fd()` is contained. +fn reset_descriptors_with_pending_command_trigger( + descriptors: &mut Vec, + pending_command_trigger: &Trigger, +) { + descriptors.clear(); + descriptors.push(libc::pollfd { + fd: pending_command_trigger.read_fd(), + events: libc::POLLIN, + revents: 0, + }); +} + +// Appends the `poll` descriptors for each stream onto the `RunContext`'s descriptor slice, ready +// for a call to `libc::poll`. +fn append_stream_poll_descriptors(run_context: &mut RunContext) { + for stream in run_context.streams.iter() { + run_context.descriptors.reserve(stream.num_descriptors); + let len = run_context.descriptors.len(); + let filled = unsafe { + alsa::snd_pcm_poll_descriptors( + stream.channel, + run_context.descriptors.as_mut_ptr().offset(len as isize), + stream.num_descriptors as libc::c_uint, + ) + }; + debug_assert_eq!(filled, stream.num_descriptors as libc::c_int); + unsafe { + run_context.descriptors.set_len(len + stream.num_descriptors); + } + } +} + +// Poll all descriptors within the given set. +// +// Returns `Ok(true)` if some event has occurred or `Ok(false)` if no events have +// occurred. +// +// Returns an `Err` if `libc::poll` returns a negative value for some reason. +fn poll_all_descriptors(descriptors: &mut [libc::pollfd]) -> Result { + let res = unsafe { + // Don't timeout, wait forever. + libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1) + }; + if res < 0 { + let description = format!("`libc::poll()` failed: {}", res); + Err(BackendSpecificError { description }) + } else if res == 0 { + Ok(false) + } else { + Ok(true) + } +} + +// Check whether the event is `POLLOUT` or `POLLIN`. +// +// If so, return the stream type associated with the event. +// +// Otherwise, returns `Ok(None)`. +// +// Returns an `Err` if the `snd_pcm_poll_descriptors_revents` call fails. +fn check_for_pollout_or_pollin( + stream: &StreamInner, + stream_descriptor_ptr: *mut libc::pollfd, +) -> Result, BackendSpecificError> { + let (revent, res) = unsafe { + let mut revent = mem::uninitialized(); + let res = alsa::snd_pcm_poll_descriptors_revents( + stream.channel, + stream_descriptor_ptr, + stream.num_descriptors as libc::c_uint, + &mut revent, + ); + (revent, res) + }; + if let Err(desc) = check_errors(res) { + let description = + format!("`snd_pcm_poll_descriptors_revents` failed: {}",desc); + let err = BackendSpecificError { description }; + return Err(err); + } + + if revent as i16 == libc::POLLOUT { + Ok(Some(StreamType::Output)) + } else if revent as i16 == libc::POLLIN { + Ok(Some(StreamType::Input)) + } else { + Ok(None) + } +} + +// Determine the number of samples that are available to read/write. +fn get_available_samples(stream: &StreamInner) -> Result { + // TODO: what about snd_pcm_avail_update? + let available = unsafe { + alsa::snd_pcm_avail(stream.channel) + }; + if available == -32 { + // buffer underrun + // TODO: Notify the user some how. + Ok(stream.buffer_len) + } else if let Err(desc) = check_errors(available as libc::c_int) { + let description = format!("failed to get available samples: {}", desc); + let err = BackendSpecificError { description }; + Err(err) + } else { + Ok((available * stream.num_channels as alsa::snd_pcm_sframes_t) as usize) + } +} + unsafe fn set_hw_params_from_format( pcm_handle: *mut alsa::snd_pcm_t, hw_params: &HwParams,