impl play and pause for ALSA

snd_pcm_pause could have been used but not all hardware implement it, so
I propose not to use it.

In this implementation:

there are two kind of scheduling: wait for resume signal and wait for
pcm to be available

if the stream is paused then it return notready and wait for resume

the event loop is different as it manages descriptors corresponding to
voices according to the nature of the scheduling.

there is still a FIXME: in voice.play the is signal is send even if
the event loop wasn't waiting for resume.
It doesn't seem to create any issue. But it happens when you write
voice.pause();voice.play();
This commit is contained in:
thiolliere 2016-10-02 13:18:27 +02:00
parent f68509982a
commit f822631bc4
2 changed files with 142 additions and 41 deletions

View File

@ -7,6 +7,8 @@ use futures::task::Executor;
use futures::task::Run;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
struct MyExecutor;
@ -57,5 +59,14 @@ fn main() {
Ok(())
})).execute(executor);
thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(500));
voice.pause();
thread::sleep(Duration::from_millis(500));
voice.play();
}
});
event_loop.run();
}

View File

@ -14,6 +14,7 @@ use UnknownTypeBuffer;
use std::{ffi, cmp, iter, mem, ptr};
use std::vec::IntoIter as VecIntoIter;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use futures::Poll;
use futures::task::Task;
@ -283,30 +284,62 @@ impl EventLoop {
let mut i_voice = 0;
let mut i_descriptor = 1;
while i_voice < current_wait.voices.len() {
let mut revent = mem::uninitialized();
{
let channel = *current_wait.voices[i_voice].channel.lock().unwrap();
let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint;
check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors
.as_mut_ptr().offset(i_descriptor),
num_descriptors, &mut revent)).unwrap();
}
if (revent as libc::c_short & libc::POLLOUT) != 0 {
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
if let Some(scheduled) = scheduled {
scheduled.unpark();
let kind = {
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap();
match *scheduled {
Some(ref scheduled) => scheduled.kind,
None => panic!("current wait unscheduled task"),
}
};
for _ in 0 .. current_wait.voices[i_voice].num_descriptors {
current_wait.descriptors.remove(i_descriptor as usize);
// Depending on the kind of scheduling the number of descriptors corresponding
// to the voice and the events associated are different
match kind {
ScheduledKind::WaitPCM => {
let mut revent = mem::uninitialized();
{
let channel = *current_wait.voices[i_voice].channel.lock().unwrap();
let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint;
check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors
.as_mut_ptr().offset(i_descriptor),
num_descriptors, &mut revent)).unwrap();
}
if (revent as libc::c_short & libc::POLLOUT) != 0 {
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
scheduled.unwrap().task.unpark();
for _ in 0 .. current_wait.voices[i_voice].num_descriptors {
current_wait.descriptors.remove(i_descriptor as usize);
}
current_wait.voices.remove(i_voice);
} else {
i_descriptor += current_wait.voices[i_voice].num_descriptors as isize;
i_voice += 1;
}
},
ScheduledKind::WaitResume => {
if current_wait.descriptors[i_descriptor as usize].revents != 0 {
// Unpark the task
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
scheduled.unwrap().task.unpark();
// Emptying the signal.
let mut out = 0u64;
let ret = libc::read(current_wait.descriptors[i_descriptor as usize].fd,
&mut out as *mut u64 as *mut _, 8);
assert_eq!(ret, 8);
// Remove from current waiting poll descriptors
current_wait.descriptors.remove(i_descriptor as usize);
current_wait.voices.remove(i_voice);
} else {
i_descriptor += 1;
i_voice += 1;
}
}
current_wait.voices.remove(i_voice);
} else {
i_descriptor += current_wait.voices[i_voice].num_descriptors as isize;
i_voice += 1;
}
}
}
@ -314,7 +347,9 @@ impl EventLoop {
}
}
pub struct Voice;
pub struct Voice {
inner: Arc<VoiceInner>,
}
pub struct Buffer<T> {
inner: Arc<VoiceInner>,
@ -325,6 +360,17 @@ pub struct SamplesStream {
inner: Arc<VoiceInner>,
}
pub struct Scheduled {
task: Task,
kind: ScheduledKind,
}
#[derive(Clone,Copy)]
pub enum ScheduledKind {
WaitResume,
WaitPCM,
}
struct VoiceInner {
// The event loop used to create the voice.
event_loop: Arc<EventLoopInner>,
@ -349,7 +395,14 @@ struct VoiceInner {
period_len: usize,
// If `Some`, something previously called `schedule` on the stream.
scheduled: Mutex<Option<Task>>,
scheduled: Mutex<Option<Scheduled>>,
// Wherease the sample stream is paused
is_paused: Arc<AtomicBool>,
// A file descriptor opened with `eventfd`.
// It is used to wait for resume signal.
resume_signal: libc::c_int,
}
unsafe impl Send for VoiceInner {}
@ -357,25 +410,41 @@ unsafe impl Sync for VoiceInner {}
impl SamplesStream {
#[inline]
fn schedule(&mut self) {
fn schedule(&mut self, kind: ScheduledKind) {
unsafe {
let channel = self.inner.channel.lock().unwrap();
// We start by filling `scheduled`.
*self.inner.scheduled.lock().unwrap() = Some(task::park());
*self.inner.scheduled.lock().unwrap() = Some(Scheduled {
task: task::park(),
kind: kind,
});
// In this function we turn the `snd_pcm_t` into a collection of file descriptors.
// And we add these descriptors to `event_loop.pending_wait.descriptors`.
let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap();
pending_wait.descriptors.reserve(self.inner.num_descriptors);
match kind {
ScheduledKind::WaitPCM => {
// In this function we turn the `snd_pcm_t` into a collection of file descriptors.
// And we add these descriptors to `event_loop.pending_wait.descriptors`.
pending_wait.descriptors.reserve(self.inner.num_descriptors);
let len = pending_wait.descriptors.len();
let filled = alsa::snd_pcm_poll_descriptors(*channel,
pending_wait.descriptors.as_mut_ptr()
.offset(len as isize),
self.inner.num_descriptors as libc::c_uint);
debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int);
pending_wait.descriptors.set_len(len + self.inner.num_descriptors);
let len = pending_wait.descriptors.len();
let filled = alsa::snd_pcm_poll_descriptors(*channel,
pending_wait.descriptors.as_mut_ptr()
.offset(len as isize),
self.inner.num_descriptors as libc::c_uint);
debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int);
pending_wait.descriptors.set_len(len + self.inner.num_descriptors);
},
ScheduledKind::WaitResume => {
// And we add the descriptor corresponding to the resume signal
// to `event_loop.pending_wait.descriptors`.
pending_wait.descriptors.push(libc::pollfd {
fd: self.inner.resume_signal,
events: libc::POLLIN,
revents: 0,
});
}
}
// We also fill `voices`.
pending_wait.voices.push(self.inner.clone());
@ -396,6 +465,12 @@ impl Stream for SamplesStream {
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// If paused then we schedule the task and return `NotReady`
if self.inner.is_paused.load(Ordering::Relaxed) {
self.schedule(ScheduledKind::WaitResume);
return Ok(Async::NotReady);
}
// Determine the number of samples that are available to write.
let available = {
let channel = self.inner.channel.lock().expect("could not lock channel");
@ -412,9 +487,9 @@ impl Stream for SamplesStream {
}
};
// If we don't have one period ready, return `NotReady`.
// If we don't have one period ready, schedule the task and return `NotReady`.
if available < self.inner.period_len {
self.schedule();
self.schedule(ScheduledKind::WaitPCM);
return Ok(Async::NotReady);
}
@ -526,6 +601,9 @@ impl Voice {
num_descriptors as usize
};
// The voice is initialized as paused
let is_paused = Arc::new(AtomicBool::new(true));
let samples_stream_inner = Arc::new(VoiceInner {
event_loop: event_loop.inner.clone(),
channel: Mutex::new(playback_handle),
@ -535,9 +613,13 @@ impl Voice {
buffer_len: buffer_len,
period_len: period_len,
scheduled: Mutex::new(None),
is_paused: is_paused.clone(),
resume_signal: libc::eventfd(0, 0),
});
Ok((Voice, SamplesStream {
Ok((Voice {
inner: samples_stream_inner.clone()
}, SamplesStream {
inner: samples_stream_inner
}))
}
@ -545,13 +627,21 @@ impl Voice {
#[inline]
pub fn play(&mut self) {
// already playing
//unimplemented!()
// If it was paused then we resume and signal
// FIXME: the signal is send even if the event loop wasn't waiting for resume, is that an issue ?
if self.inner.is_paused.swap(false,Ordering::Relaxed) {
unsafe {
let buf = 1u64;
let wret = libc::write(self.inner.resume_signal,
&buf as *const u64 as *const _, 8);
assert!(wret == 8);
}
}
}
#[inline]
pub fn pause(&mut self) {
unimplemented!()
self.inner.is_paused.store(true,Ordering::Relaxed);
}
}