From 7c587853ad87f20cb3d8a3411f27500eba273183 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 2 Aug 2016 22:28:37 +0200 Subject: [PATCH] Make it work on Linux --- src/alsa/mod.rs | 373 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 300 insertions(+), 73 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index fb8ab17..f6b8cb9 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -9,10 +9,16 @@ use Format; use FormatsEnumerationError; use SampleFormat; use SamplesRate; +use UnknownTypeBuffer; use std::{ffi, cmp, iter, mem, ptr}; use std::vec::IntoIter as VecIntoIter; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; + +use futures::Poll; +use futures::Task; +use futures::TaskHandle; +use futures::stream::Stream; pub type SupportedFormatsIterator = VecIntoIter; @@ -174,18 +180,273 @@ impl Endpoint { } } -pub struct Voice { - channel: Mutex<*mut alsa::snd_pcm_t>, - num_channels: u16, - buffer_len: usize, // number of samples that can fit in the buffer - period_len: usize, // minimum number of samples to put in the buffer +pub struct EventLoop { + inner: Arc, } -pub struct Buffer<'a, T> { - channel: &'a mut Voice, +struct EventLoopInner { + // Descriptors that we are currently waiting upon. This member is always locked while `run()` + // is executed, ie. most of the time. + // + // Note that for `current_wait`, the first element of `descriptors` is always + // `pending_wait_signal`. Therefore the length of `descriptors` is always one more than + // `voices`. + current_wait: Mutex, + + // Since we can't add elements to `current_wait` (as it's locked), we add them to + // `pending_wait`. Once that's done, we signal `pending_wait_signal` so that the `run()` + // function can pause and add the content of `pending_wait` to `current_wait`. + pending_wait: Mutex, + + // A file descriptor opened with `eventfd`. Always the first element + // of `current_wait.descriptors`. Should be notified when an element is added + // to `pending_wait` so that the current wait can stop and take the pending wait into + // account. + pending_wait_signal: libc::c_int, +} + +struct PollDescriptors { + // Descriptors to wait for. + descriptors: Vec, + // List of voices that are written in `descriptors`. + voices: Vec>, +} + +unsafe impl Send for EventLoopInner {} +unsafe impl Sync for EventLoopInner {} + +impl Drop for EventLoopInner { + fn drop(&mut self) { + unsafe { + libc::close(self.pending_wait_signal); + } + } +} + +impl EventLoop { + #[inline] + pub fn new() -> EventLoop { + let pending_wait_signal = unsafe { libc::eventfd(0, 0) }; + + EventLoop { + inner: Arc::new(EventLoopInner { + current_wait: Mutex::new(PollDescriptors { + descriptors: vec![libc::pollfd { + fd: pending_wait_signal, + events: libc::POLLIN, + revents: 0, + }], + voices: Vec::new(), + }), + pending_wait: Mutex::new(PollDescriptors { + descriptors: Vec::new(), + voices: Vec::new(), + }), + pending_wait_signal: pending_wait_signal, + }) + } + } + + #[inline] + pub fn run(&self) { + unsafe { + let mut current_wait = self.inner.current_wait.lock().unwrap(); + + loop { + let ret = libc::poll(current_wait.descriptors.as_mut_ptr(), + current_wait.descriptors.len() as libc::nfds_t, + -1 /* infinite */); + assert!(ret >= 0, "poll() failed"); + + if ret == 0 { + continue; + } + + // If the `pending_wait_signal` was signaled, add the pending waits to + // the current waits. + if current_wait.descriptors[0].revents != 0 { + current_wait.descriptors[0].revents = 0; + + let mut pending = self.inner.pending_wait.lock().unwrap(); + current_wait.descriptors.append(&mut pending.descriptors); + current_wait.voices.append(&mut pending.voices); + + // Emptying the signal. + let mut out = 0u64; + let ret = libc::read(self.inner.pending_wait_signal, + &mut out as *mut u64 as *mut _, 8); + assert_eq!(ret, 8); + } + + // Check each individual descriptor for events. + let mut i_voice = 0; + let mut i_descriptor = 1; + while i_voice < current_wait.voices.len() { + let mut revent = mem::uninitialized(); + + { + let channel = *current_wait.voices[i_voice].channel.lock().unwrap(); + let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint; + check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors + .as_mut_ptr().offset(i_descriptor), + num_descriptors, &mut revent)).unwrap(); + } + + if (revent as libc::c_short & libc::POLLOUT) != 0 { + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); + if let Some(scheduled) = scheduled { + scheduled.notify(); + } + + for _ in 0 .. current_wait.voices[i_voice].num_descriptors { + current_wait.descriptors.remove(i_descriptor as usize); + } + current_wait.voices.remove(i_voice); + + } else { + i_descriptor += current_wait.voices[i_voice].num_descriptors as isize; + i_voice += 1; + } + } + } + } + } +} + +pub struct Voice; + +pub struct Buffer { + inner: Arc, buffer: Vec, } +pub struct SamplesStream { + inner: Arc, +} + +struct VoiceInner { + // The event loop used to create the voice. + event_loop: Arc, + + // The ALSA channel. + channel: Mutex<*mut alsa::snd_pcm_t>, + + // When converting between file descriptors and `snd_pcm_t`, this is the number of + // file descriptors that this `snd_pcm_t` uses. + num_descriptors: usize, + + // Format of the samples. + sample_format: SampleFormat, + + // Number of channels, ie. number of samples per frame. + num_channels: u16, + + // Number of samples that can fit in the buffer. + buffer_len: usize, + + // Minimum number of samples to put in the buffer. + period_len: usize, + + // If `Some`, something previously called `schedule` on the stream. + scheduled: Mutex>, +} + +unsafe impl Send for VoiceInner {} +unsafe impl Sync for VoiceInner {} + +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); + + fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { + // Determine the number of samples that are available to write. + let available = { + let channel = self.inner.channel.lock().expect("could not lock channel"); + let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update? + + if available == -32 { + // buffer underrun + self.inner.buffer_len + } else if available < 0 { + check_errors(available as libc::c_int).expect("buffer is not available"); + unreachable!() + } else { + (available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize + } + }; + + // If we don't have one period ready, return `NotReady`. + if available < self.inner.period_len { + return Poll::NotReady; + } + + // Add an upper bound to the available space. + let available = if available > 8192 { 8192 } else { available }; + + // We now sure that we're ready to write data. + match self.inner.sample_format { + SampleFormat::I16 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Poll::Ok(Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) }))) + }, + SampleFormat::U16 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Poll::Ok(Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) }))) + }, + SampleFormat::F32 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) + }, + } + } + + #[inline] + fn schedule(&mut self, task: &mut Task) { + unsafe { + let channel = self.inner.channel.lock().unwrap(); + + // We start by filling `scheduled`. + *self.inner.scheduled.lock().unwrap() = Some(task.handle().clone()); + + // In this function we turn the `snd_pcm_t` into a collection of file descriptors. + // And we add these descriptors to `event_loop.pending_wait.descriptors`. + let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap(); + pending_wait.descriptors.reserve(self.inner.num_descriptors); + + let len = pending_wait.descriptors.len(); + let filled = alsa::snd_pcm_poll_descriptors(*channel, + pending_wait.descriptors.as_mut_ptr() + .offset(len as isize), + self.inner.num_descriptors as libc::c_uint); + debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int); + pending_wait.descriptors.set_len(len + self.inner.num_descriptors); + + // We also fill `voices`. + pending_wait.voices.push(self.inner.clone()); + + // Now that `pending_wait` received additional descriptors, we signal the event + // so that our event loops can pick it up. + drop(pending_wait); + let buf = 1u64; + let wret = libc::write(self.inner.event_loop.pending_wait_signal, + &buf as *const u64 as *const _, 8); + assert!(wret == 8); + } + } +} + /// Wrapper around `hw_params`. struct HwParams(*mut alsa::snd_pcm_hw_params_t); @@ -208,13 +469,15 @@ impl Drop for HwParams { } impl Voice { - pub fn new(endpoint: &Endpoint, format: &Format) -> Result { + pub fn new(endpoint: &Endpoint, format: &Format, event_loop: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { unsafe { let name = ffi::CString::new(endpoint.0.clone()).expect("unable to clone endpoint"); let mut playback_handle = mem::uninitialized(); match alsa::snd_pcm_open(&mut playback_handle, name.as_ptr(), - alsa::SND_PCM_STREAM_PLAYBACK, alsa::SND_PCM_NONBLOCK) + alsa::SND_PCM_STREAM_PLAYBACK, 0) { -16 /* determined empirically */ => return Err(CreationError::DeviceNotAvailable), e => check_errors(e).expect("Device unavailable") @@ -235,6 +498,13 @@ impl Voice { check_errors(alsa::snd_pcm_hw_params_set_channels(playback_handle, hw_params.0, format.channels.len() as libc::c_uint)).expect("channel count could not be set"); check_errors(alsa::snd_pcm_hw_params(playback_handle, hw_params.0)).expect("hardware params could not be set"); + let mut sw_params = mem::uninitialized(); // TODO: RAII + check_errors(alsa::snd_pcm_sw_params_malloc(&mut sw_params)).unwrap(); + check_errors(alsa::snd_pcm_sw_params_current(playback_handle, sw_params)).unwrap(); + check_errors(alsa::snd_pcm_sw_params_set_avail_min(playback_handle, sw_params, 4096)).unwrap(); + check_errors(alsa::snd_pcm_sw_params_set_start_threshold(playback_handle, sw_params, 0)).unwrap(); + check_errors(alsa::snd_pcm_sw_params(playback_handle, sw_params)).unwrap(); + check_errors(alsa::snd_pcm_prepare(playback_handle)).expect("could not get playback handle"); let (buffer_len, period_len) = { @@ -247,36 +517,26 @@ impl Voice { (buffer, period) }; - Ok(Voice { + let num_descriptors = { + let num_descriptors = alsa::snd_pcm_poll_descriptors_count(playback_handle); + debug_assert!(num_descriptors >= 1); + num_descriptors as usize + }; + + let samples_stream_inner = Arc::new(VoiceInner { + event_loop: event_loop.inner.clone(), channel: Mutex::new(playback_handle), + sample_format: format.data_type, + num_descriptors: num_descriptors, num_channels: format.channels.len() as u16, buffer_len: buffer_len, period_len: period_len, - }) - } - } + scheduled: Mutex::new(None), + }); - pub fn append_data<'a, T>(&'a mut self, max_elements: usize) -> Buffer<'a, T> where T: Clone { - let available = { - let channel = self.channel.lock().expect("could not lock channel"); - let available = unsafe { alsa::snd_pcm_avail(*channel) }; - - if available == -32 { - // buffer underrun - self.buffer_len - } else if available < 0 { - check_errors(available as libc::c_int).expect("buffer is not available"); - unreachable!() - } else { - (available * self.num_channels as alsa::snd_pcm_sframes_t) as usize - } - }; - - let elements = cmp::min(available, max_elements); - - Buffer { - channel: self, - buffer: iter::repeat(unsafe { mem::uninitialized() }).take(elements).collect(), + Ok((Voice, SamplesStream { + inner: samples_stream_inner + })) } } @@ -290,42 +550,9 @@ impl Voice { pub fn pause(&mut self) { unimplemented!() } - - #[inline] - pub fn get_period(&self) -> usize { - self.period_len - } - - pub fn get_pending_samples(&self) -> usize { - let available = { - let channel = self.channel.lock().expect("could not lock channel"); - let available = unsafe { alsa::snd_pcm_avail(*channel) }; - - if available == -32 { - self.buffer_len as alsa::snd_pcm_sframes_t // buffer underrun - } else if available < 0 { - check_errors(available as libc::c_int).expect("could not write to buffer"); - unreachable!() - } else { - available * self.num_channels as alsa::snd_pcm_sframes_t - } - }; - - self.buffer_len - available as usize - } - - pub fn underflowed(&self) -> bool { - let channel = self.channel.lock().expect("channel underflow"); - - let available = unsafe { alsa::snd_pcm_avail(*channel) }; - available == -32 - } } -unsafe impl Send for Voice {} -unsafe impl Sync for Voice {} - -impl Drop for Voice { +impl Drop for VoiceInner { #[inline] fn drop(&mut self) { unsafe { @@ -334,9 +561,9 @@ impl Drop for Voice { } } -impl<'a, T> Buffer<'a, T> { +impl Buffer { #[inline] - pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] { + pub fn get_buffer(&mut self) -> &mut [T] { &mut self.buffer } @@ -346,9 +573,9 @@ impl<'a, T> Buffer<'a, T> { } pub fn finish(self) { - let to_write = (self.buffer.len() / self.channel.num_channels as usize) + let to_write = (self.buffer.len() / self.inner.num_channels as usize) as alsa::snd_pcm_uframes_t; - let channel = self.channel.channel.lock().expect("Buffer channel lock failed"); + let channel = self.inner.channel.lock().expect("Buffer channel lock failed"); unsafe { loop {