Make it work on Linux
This commit is contained in:
parent
be8310da51
commit
7c587853ad
373
src/alsa/mod.rs
373
src/alsa/mod.rs
|
@ -9,10 +9,16 @@ use Format;
|
||||||
use FormatsEnumerationError;
|
use FormatsEnumerationError;
|
||||||
use SampleFormat;
|
use SampleFormat;
|
||||||
use SamplesRate;
|
use SamplesRate;
|
||||||
|
use UnknownTypeBuffer;
|
||||||
|
|
||||||
use std::{ffi, cmp, iter, mem, ptr};
|
use std::{ffi, cmp, iter, mem, ptr};
|
||||||
use std::vec::IntoIter as VecIntoIter;
|
use std::vec::IntoIter as VecIntoIter;
|
||||||
use std::sync::Mutex;
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use futures::Poll;
|
||||||
|
use futures::Task;
|
||||||
|
use futures::TaskHandle;
|
||||||
|
use futures::stream::Stream;
|
||||||
|
|
||||||
pub type SupportedFormatsIterator = VecIntoIter<Format>;
|
pub type SupportedFormatsIterator = VecIntoIter<Format>;
|
||||||
|
|
||||||
|
@ -174,18 +180,273 @@ impl Endpoint {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Voice {
|
pub struct EventLoop {
|
||||||
channel: Mutex<*mut alsa::snd_pcm_t>,
|
inner: Arc<EventLoopInner>,
|
||||||
num_channels: u16,
|
|
||||||
buffer_len: usize, // number of samples that can fit in the buffer
|
|
||||||
period_len: usize, // minimum number of samples to put in the buffer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Buffer<'a, T> {
|
struct EventLoopInner {
|
||||||
channel: &'a mut Voice,
|
// Descriptors that we are currently waiting upon. This member is always locked while `run()`
|
||||||
|
// is executed, ie. most of the time.
|
||||||
|
//
|
||||||
|
// Note that for `current_wait`, the first element of `descriptors` is always
|
||||||
|
// `pending_wait_signal`. Therefore the length of `descriptors` is always one more than
|
||||||
|
// `voices`.
|
||||||
|
current_wait: Mutex<PollDescriptors>,
|
||||||
|
|
||||||
|
// Since we can't add elements to `current_wait` (as it's locked), we add them to
|
||||||
|
// `pending_wait`. Once that's done, we signal `pending_wait_signal` so that the `run()`
|
||||||
|
// function can pause and add the content of `pending_wait` to `current_wait`.
|
||||||
|
pending_wait: Mutex<PollDescriptors>,
|
||||||
|
|
||||||
|
// A file descriptor opened with `eventfd`. Always the first element
|
||||||
|
// of `current_wait.descriptors`. Should be notified when an element is added
|
||||||
|
// to `pending_wait` so that the current wait can stop and take the pending wait into
|
||||||
|
// account.
|
||||||
|
pending_wait_signal: libc::c_int,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PollDescriptors {
|
||||||
|
// Descriptors to wait for.
|
||||||
|
descriptors: Vec<libc::pollfd>,
|
||||||
|
// List of voices that are written in `descriptors`.
|
||||||
|
voices: Vec<Arc<VoiceInner>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for EventLoopInner {}
|
||||||
|
unsafe impl Sync for EventLoopInner {}
|
||||||
|
|
||||||
|
impl Drop for EventLoopInner {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
unsafe {
|
||||||
|
libc::close(self.pending_wait_signal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventLoop {
|
||||||
|
#[inline]
|
||||||
|
pub fn new() -> EventLoop {
|
||||||
|
let pending_wait_signal = unsafe { libc::eventfd(0, 0) };
|
||||||
|
|
||||||
|
EventLoop {
|
||||||
|
inner: Arc::new(EventLoopInner {
|
||||||
|
current_wait: Mutex::new(PollDescriptors {
|
||||||
|
descriptors: vec![libc::pollfd {
|
||||||
|
fd: pending_wait_signal,
|
||||||
|
events: libc::POLLIN,
|
||||||
|
revents: 0,
|
||||||
|
}],
|
||||||
|
voices: Vec::new(),
|
||||||
|
}),
|
||||||
|
pending_wait: Mutex::new(PollDescriptors {
|
||||||
|
descriptors: Vec::new(),
|
||||||
|
voices: Vec::new(),
|
||||||
|
}),
|
||||||
|
pending_wait_signal: pending_wait_signal,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn run(&self) {
|
||||||
|
unsafe {
|
||||||
|
let mut current_wait = self.inner.current_wait.lock().unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let ret = libc::poll(current_wait.descriptors.as_mut_ptr(),
|
||||||
|
current_wait.descriptors.len() as libc::nfds_t,
|
||||||
|
-1 /* infinite */);
|
||||||
|
assert!(ret >= 0, "poll() failed");
|
||||||
|
|
||||||
|
if ret == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the `pending_wait_signal` was signaled, add the pending waits to
|
||||||
|
// the current waits.
|
||||||
|
if current_wait.descriptors[0].revents != 0 {
|
||||||
|
current_wait.descriptors[0].revents = 0;
|
||||||
|
|
||||||
|
let mut pending = self.inner.pending_wait.lock().unwrap();
|
||||||
|
current_wait.descriptors.append(&mut pending.descriptors);
|
||||||
|
current_wait.voices.append(&mut pending.voices);
|
||||||
|
|
||||||
|
// Emptying the signal.
|
||||||
|
let mut out = 0u64;
|
||||||
|
let ret = libc::read(self.inner.pending_wait_signal,
|
||||||
|
&mut out as *mut u64 as *mut _, 8);
|
||||||
|
assert_eq!(ret, 8);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check each individual descriptor for events.
|
||||||
|
let mut i_voice = 0;
|
||||||
|
let mut i_descriptor = 1;
|
||||||
|
while i_voice < current_wait.voices.len() {
|
||||||
|
let mut revent = mem::uninitialized();
|
||||||
|
|
||||||
|
{
|
||||||
|
let channel = *current_wait.voices[i_voice].channel.lock().unwrap();
|
||||||
|
let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint;
|
||||||
|
check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors
|
||||||
|
.as_mut_ptr().offset(i_descriptor),
|
||||||
|
num_descriptors, &mut revent)).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (revent as libc::c_short & libc::POLLOUT) != 0 {
|
||||||
|
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
|
||||||
|
if let Some(scheduled) = scheduled {
|
||||||
|
scheduled.notify();
|
||||||
|
}
|
||||||
|
|
||||||
|
for _ in 0 .. current_wait.voices[i_voice].num_descriptors {
|
||||||
|
current_wait.descriptors.remove(i_descriptor as usize);
|
||||||
|
}
|
||||||
|
current_wait.voices.remove(i_voice);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
i_descriptor += current_wait.voices[i_voice].num_descriptors as isize;
|
||||||
|
i_voice += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Voice;
|
||||||
|
|
||||||
|
pub struct Buffer<T> {
|
||||||
|
inner: Arc<VoiceInner>,
|
||||||
buffer: Vec<T>,
|
buffer: Vec<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct SamplesStream {
|
||||||
|
inner: Arc<VoiceInner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct VoiceInner {
|
||||||
|
// The event loop used to create the voice.
|
||||||
|
event_loop: Arc<EventLoopInner>,
|
||||||
|
|
||||||
|
// The ALSA channel.
|
||||||
|
channel: Mutex<*mut alsa::snd_pcm_t>,
|
||||||
|
|
||||||
|
// When converting between file descriptors and `snd_pcm_t`, this is the number of
|
||||||
|
// file descriptors that this `snd_pcm_t` uses.
|
||||||
|
num_descriptors: usize,
|
||||||
|
|
||||||
|
// Format of the samples.
|
||||||
|
sample_format: SampleFormat,
|
||||||
|
|
||||||
|
// Number of channels, ie. number of samples per frame.
|
||||||
|
num_channels: u16,
|
||||||
|
|
||||||
|
// Number of samples that can fit in the buffer.
|
||||||
|
buffer_len: usize,
|
||||||
|
|
||||||
|
// Minimum number of samples to put in the buffer.
|
||||||
|
period_len: usize,
|
||||||
|
|
||||||
|
// If `Some`, something previously called `schedule` on the stream.
|
||||||
|
scheduled: Mutex<Option<TaskHandle>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for VoiceInner {}
|
||||||
|
unsafe impl Sync for VoiceInner {}
|
||||||
|
|
||||||
|
impl Stream for SamplesStream {
|
||||||
|
type Item = UnknownTypeBuffer;
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
|
// Determine the number of samples that are available to write.
|
||||||
|
let available = {
|
||||||
|
let channel = self.inner.channel.lock().expect("could not lock channel");
|
||||||
|
let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update?
|
||||||
|
|
||||||
|
if available == -32 {
|
||||||
|
// buffer underrun
|
||||||
|
self.inner.buffer_len
|
||||||
|
} else if available < 0 {
|
||||||
|
check_errors(available as libc::c_int).expect("buffer is not available");
|
||||||
|
unreachable!()
|
||||||
|
} else {
|
||||||
|
(available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// If we don't have one period ready, return `NotReady`.
|
||||||
|
if available < self.inner.period_len {
|
||||||
|
return Poll::NotReady;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add an upper bound to the available space.
|
||||||
|
let available = if available > 8192 { 8192 } else { available };
|
||||||
|
|
||||||
|
// We now sure that we're ready to write data.
|
||||||
|
match self.inner.sample_format {
|
||||||
|
SampleFormat::I16 => {
|
||||||
|
let buffer = Buffer {
|
||||||
|
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Poll::Ok(Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) })))
|
||||||
|
},
|
||||||
|
SampleFormat::U16 => {
|
||||||
|
let buffer = Buffer {
|
||||||
|
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Poll::Ok(Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) })))
|
||||||
|
},
|
||||||
|
SampleFormat::F32 => {
|
||||||
|
let buffer = Buffer {
|
||||||
|
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn schedule(&mut self, task: &mut Task) {
|
||||||
|
unsafe {
|
||||||
|
let channel = self.inner.channel.lock().unwrap();
|
||||||
|
|
||||||
|
// We start by filling `scheduled`.
|
||||||
|
*self.inner.scheduled.lock().unwrap() = Some(task.handle().clone());
|
||||||
|
|
||||||
|
// In this function we turn the `snd_pcm_t` into a collection of file descriptors.
|
||||||
|
// And we add these descriptors to `event_loop.pending_wait.descriptors`.
|
||||||
|
let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap();
|
||||||
|
pending_wait.descriptors.reserve(self.inner.num_descriptors);
|
||||||
|
|
||||||
|
let len = pending_wait.descriptors.len();
|
||||||
|
let filled = alsa::snd_pcm_poll_descriptors(*channel,
|
||||||
|
pending_wait.descriptors.as_mut_ptr()
|
||||||
|
.offset(len as isize),
|
||||||
|
self.inner.num_descriptors as libc::c_uint);
|
||||||
|
debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int);
|
||||||
|
pending_wait.descriptors.set_len(len + self.inner.num_descriptors);
|
||||||
|
|
||||||
|
// We also fill `voices`.
|
||||||
|
pending_wait.voices.push(self.inner.clone());
|
||||||
|
|
||||||
|
// Now that `pending_wait` received additional descriptors, we signal the event
|
||||||
|
// so that our event loops can pick it up.
|
||||||
|
drop(pending_wait);
|
||||||
|
let buf = 1u64;
|
||||||
|
let wret = libc::write(self.inner.event_loop.pending_wait_signal,
|
||||||
|
&buf as *const u64 as *const _, 8);
|
||||||
|
assert!(wret == 8);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Wrapper around `hw_params`.
|
/// Wrapper around `hw_params`.
|
||||||
struct HwParams(*mut alsa::snd_pcm_hw_params_t);
|
struct HwParams(*mut alsa::snd_pcm_hw_params_t);
|
||||||
|
|
||||||
|
@ -208,13 +469,15 @@ impl Drop for HwParams {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Voice {
|
impl Voice {
|
||||||
pub fn new(endpoint: &Endpoint, format: &Format) -> Result<Voice, CreationError> {
|
pub fn new(endpoint: &Endpoint, format: &Format, event_loop: &EventLoop)
|
||||||
|
-> Result<(Voice, SamplesStream), CreationError>
|
||||||
|
{
|
||||||
unsafe {
|
unsafe {
|
||||||
let name = ffi::CString::new(endpoint.0.clone()).expect("unable to clone endpoint");
|
let name = ffi::CString::new(endpoint.0.clone()).expect("unable to clone endpoint");
|
||||||
|
|
||||||
let mut playback_handle = mem::uninitialized();
|
let mut playback_handle = mem::uninitialized();
|
||||||
match alsa::snd_pcm_open(&mut playback_handle, name.as_ptr(),
|
match alsa::snd_pcm_open(&mut playback_handle, name.as_ptr(),
|
||||||
alsa::SND_PCM_STREAM_PLAYBACK, alsa::SND_PCM_NONBLOCK)
|
alsa::SND_PCM_STREAM_PLAYBACK, 0)
|
||||||
{
|
{
|
||||||
-16 /* determined empirically */ => return Err(CreationError::DeviceNotAvailable),
|
-16 /* determined empirically */ => return Err(CreationError::DeviceNotAvailable),
|
||||||
e => check_errors(e).expect("Device unavailable")
|
e => check_errors(e).expect("Device unavailable")
|
||||||
|
@ -235,6 +498,13 @@ impl Voice {
|
||||||
check_errors(alsa::snd_pcm_hw_params_set_channels(playback_handle, hw_params.0, format.channels.len() as libc::c_uint)).expect("channel count could not be set");
|
check_errors(alsa::snd_pcm_hw_params_set_channels(playback_handle, hw_params.0, format.channels.len() as libc::c_uint)).expect("channel count could not be set");
|
||||||
check_errors(alsa::snd_pcm_hw_params(playback_handle, hw_params.0)).expect("hardware params could not be set");
|
check_errors(alsa::snd_pcm_hw_params(playback_handle, hw_params.0)).expect("hardware params could not be set");
|
||||||
|
|
||||||
|
let mut sw_params = mem::uninitialized(); // TODO: RAII
|
||||||
|
check_errors(alsa::snd_pcm_sw_params_malloc(&mut sw_params)).unwrap();
|
||||||
|
check_errors(alsa::snd_pcm_sw_params_current(playback_handle, sw_params)).unwrap();
|
||||||
|
check_errors(alsa::snd_pcm_sw_params_set_avail_min(playback_handle, sw_params, 4096)).unwrap();
|
||||||
|
check_errors(alsa::snd_pcm_sw_params_set_start_threshold(playback_handle, sw_params, 0)).unwrap();
|
||||||
|
check_errors(alsa::snd_pcm_sw_params(playback_handle, sw_params)).unwrap();
|
||||||
|
|
||||||
check_errors(alsa::snd_pcm_prepare(playback_handle)).expect("could not get playback handle");
|
check_errors(alsa::snd_pcm_prepare(playback_handle)).expect("could not get playback handle");
|
||||||
|
|
||||||
let (buffer_len, period_len) = {
|
let (buffer_len, period_len) = {
|
||||||
|
@ -247,36 +517,26 @@ impl Voice {
|
||||||
(buffer, period)
|
(buffer, period)
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Voice {
|
let num_descriptors = {
|
||||||
|
let num_descriptors = alsa::snd_pcm_poll_descriptors_count(playback_handle);
|
||||||
|
debug_assert!(num_descriptors >= 1);
|
||||||
|
num_descriptors as usize
|
||||||
|
};
|
||||||
|
|
||||||
|
let samples_stream_inner = Arc::new(VoiceInner {
|
||||||
|
event_loop: event_loop.inner.clone(),
|
||||||
channel: Mutex::new(playback_handle),
|
channel: Mutex::new(playback_handle),
|
||||||
|
sample_format: format.data_type,
|
||||||
|
num_descriptors: num_descriptors,
|
||||||
num_channels: format.channels.len() as u16,
|
num_channels: format.channels.len() as u16,
|
||||||
buffer_len: buffer_len,
|
buffer_len: buffer_len,
|
||||||
period_len: period_len,
|
period_len: period_len,
|
||||||
})
|
scheduled: Mutex::new(None),
|
||||||
}
|
});
|
||||||
}
|
|
||||||
|
|
||||||
pub fn append_data<'a, T>(&'a mut self, max_elements: usize) -> Buffer<'a, T> where T: Clone {
|
Ok((Voice, SamplesStream {
|
||||||
let available = {
|
inner: samples_stream_inner
|
||||||
let channel = self.channel.lock().expect("could not lock channel");
|
}))
|
||||||
let available = unsafe { alsa::snd_pcm_avail(*channel) };
|
|
||||||
|
|
||||||
if available == -32 {
|
|
||||||
// buffer underrun
|
|
||||||
self.buffer_len
|
|
||||||
} else if available < 0 {
|
|
||||||
check_errors(available as libc::c_int).expect("buffer is not available");
|
|
||||||
unreachable!()
|
|
||||||
} else {
|
|
||||||
(available * self.num_channels as alsa::snd_pcm_sframes_t) as usize
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let elements = cmp::min(available, max_elements);
|
|
||||||
|
|
||||||
Buffer {
|
|
||||||
channel: self,
|
|
||||||
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(elements).collect(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -290,42 +550,9 @@ impl Voice {
|
||||||
pub fn pause(&mut self) {
|
pub fn pause(&mut self) {
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn get_period(&self) -> usize {
|
|
||||||
self.period_len
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_pending_samples(&self) -> usize {
|
impl Drop for VoiceInner {
|
||||||
let available = {
|
|
||||||
let channel = self.channel.lock().expect("could not lock channel");
|
|
||||||
let available = unsafe { alsa::snd_pcm_avail(*channel) };
|
|
||||||
|
|
||||||
if available == -32 {
|
|
||||||
self.buffer_len as alsa::snd_pcm_sframes_t // buffer underrun
|
|
||||||
} else if available < 0 {
|
|
||||||
check_errors(available as libc::c_int).expect("could not write to buffer");
|
|
||||||
unreachable!()
|
|
||||||
} else {
|
|
||||||
available * self.num_channels as alsa::snd_pcm_sframes_t
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.buffer_len - available as usize
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn underflowed(&self) -> bool {
|
|
||||||
let channel = self.channel.lock().expect("channel underflow");
|
|
||||||
|
|
||||||
let available = unsafe { alsa::snd_pcm_avail(*channel) };
|
|
||||||
available == -32
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl Send for Voice {}
|
|
||||||
unsafe impl Sync for Voice {}
|
|
||||||
|
|
||||||
impl Drop for Voice {
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -334,9 +561,9 @@ impl Drop for Voice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T> Buffer<'a, T> {
|
impl<T> Buffer<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] {
|
pub fn get_buffer(&mut self) -> &mut [T] {
|
||||||
&mut self.buffer
|
&mut self.buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,9 +573,9 @@ impl<'a, T> Buffer<'a, T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn finish(self) {
|
pub fn finish(self) {
|
||||||
let to_write = (self.buffer.len() / self.channel.num_channels as usize)
|
let to_write = (self.buffer.len() / self.inner.num_channels as usize)
|
||||||
as alsa::snd_pcm_uframes_t;
|
as alsa::snd_pcm_uframes_t;
|
||||||
let channel = self.channel.channel.lock().expect("Buffer channel lock failed");
|
let channel = self.inner.channel.lock().expect("Buffer channel lock failed");
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
loop {
|
loop {
|
||||||
|
|
Loading…
Reference in New Issue