Merge pull request #135 from thiolliere/alsa_play_pause

impl play and pause for ALSA
This commit is contained in:
tomaka 2016-10-02 16:38:25 +02:00 committed by GitHub
commit bb65200745
2 changed files with 139 additions and 41 deletions

View File

@ -7,6 +7,8 @@ use futures::task::Executor;
use futures::task::Run; use futures::task::Run;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::time::Duration;
struct MyExecutor; struct MyExecutor;
@ -57,5 +59,14 @@ fn main() {
Ok(()) Ok(())
})).execute(executor); })).execute(executor);
thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(500));
voice.pause();
thread::sleep(Duration::from_millis(500));
voice.play();
}
});
event_loop.run(); event_loop.run();
} }

View File

@ -14,6 +14,7 @@ use UnknownTypeBuffer;
use std::{ffi, cmp, iter, mem, ptr}; use std::{ffi, cmp, iter, mem, ptr};
use std::vec::IntoIter as VecIntoIter; use std::vec::IntoIter as VecIntoIter;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use futures::Poll; use futures::Poll;
use futures::task::Task; use futures::task::Task;
@ -283,6 +284,18 @@ impl EventLoop {
let mut i_voice = 0; let mut i_voice = 0;
let mut i_descriptor = 1; let mut i_descriptor = 1;
while i_voice < current_wait.voices.len() { while i_voice < current_wait.voices.len() {
let kind = {
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap();
match *scheduled {
Some(ref scheduled) => scheduled.kind,
None => panic!("current wait unscheduled task"),
}
};
// Depending on the kind of scheduling the number of descriptors corresponding
// to the voice and the events associated are different
match kind {
ScheduledKind::WaitPCM => {
let mut revent = mem::uninitialized(); let mut revent = mem::uninitialized();
{ {
@ -295,9 +308,7 @@ impl EventLoop {
if (revent as libc::c_short & libc::POLLOUT) != 0 { if (revent as libc::c_short & libc::POLLOUT) != 0 {
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
if let Some(scheduled) = scheduled { scheduled.unwrap().task.unpark();
scheduled.unpark();
}
for _ in 0 .. current_wait.voices[i_voice].num_descriptors { for _ in 0 .. current_wait.voices[i_voice].num_descriptors {
current_wait.descriptors.remove(i_descriptor as usize); current_wait.descriptors.remove(i_descriptor as usize);
@ -308,13 +319,37 @@ impl EventLoop {
i_descriptor += current_wait.voices[i_voice].num_descriptors as isize; i_descriptor += current_wait.voices[i_voice].num_descriptors as isize;
i_voice += 1; i_voice += 1;
} }
},
ScheduledKind::WaitResume => {
if current_wait.descriptors[i_descriptor as usize].revents != 0 {
// Unpark the task
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
scheduled.unwrap().task.unpark();
// Emptying the signal.
let mut out = 0u64;
let ret = libc::read(current_wait.descriptors[i_descriptor as usize].fd,
&mut out as *mut u64 as *mut _, 8);
assert_eq!(ret, 8);
// Remove from current waiting poll descriptors
current_wait.descriptors.remove(i_descriptor as usize);
current_wait.voices.remove(i_voice);
} else {
i_descriptor += 1;
i_voice += 1;
}
}
}
} }
} }
} }
} }
} }
pub struct Voice; pub struct Voice {
inner: Arc<VoiceInner>,
}
pub struct Buffer<T> { pub struct Buffer<T> {
inner: Arc<VoiceInner>, inner: Arc<VoiceInner>,
@ -325,6 +360,17 @@ pub struct SamplesStream {
inner: Arc<VoiceInner>, inner: Arc<VoiceInner>,
} }
pub struct Scheduled {
task: Task,
kind: ScheduledKind,
}
#[derive(Clone,Copy)]
pub enum ScheduledKind {
WaitResume,
WaitPCM,
}
struct VoiceInner { struct VoiceInner {
// The event loop used to create the voice. // The event loop used to create the voice.
event_loop: Arc<EventLoopInner>, event_loop: Arc<EventLoopInner>,
@ -349,7 +395,14 @@ struct VoiceInner {
period_len: usize, period_len: usize,
// If `Some`, something previously called `schedule` on the stream. // If `Some`, something previously called `schedule` on the stream.
scheduled: Mutex<Option<Task>>, scheduled: Mutex<Option<Scheduled>>,
// Wherease the sample stream is paused
is_paused: AtomicBool,
// A file descriptor opened with `eventfd`.
// It is used to wait for resume signal.
resume_signal: libc::c_int,
} }
unsafe impl Send for VoiceInner {} unsafe impl Send for VoiceInner {}
@ -357,16 +410,21 @@ unsafe impl Sync for VoiceInner {}
impl SamplesStream { impl SamplesStream {
#[inline] #[inline]
fn schedule(&mut self) { fn schedule(&mut self, kind: ScheduledKind) {
unsafe { unsafe {
let channel = self.inner.channel.lock().unwrap(); let channel = self.inner.channel.lock().unwrap();
// We start by filling `scheduled`. // We start by filling `scheduled`.
*self.inner.scheduled.lock().unwrap() = Some(task::park()); *self.inner.scheduled.lock().unwrap() = Some(Scheduled {
task: task::park(),
kind: kind,
});
let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap();
match kind {
ScheduledKind::WaitPCM => {
// In this function we turn the `snd_pcm_t` into a collection of file descriptors. // In this function we turn the `snd_pcm_t` into a collection of file descriptors.
// And we add these descriptors to `event_loop.pending_wait.descriptors`. // And we add these descriptors to `event_loop.pending_wait.descriptors`.
let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap();
pending_wait.descriptors.reserve(self.inner.num_descriptors); pending_wait.descriptors.reserve(self.inner.num_descriptors);
let len = pending_wait.descriptors.len(); let len = pending_wait.descriptors.len();
@ -376,6 +434,17 @@ impl SamplesStream {
self.inner.num_descriptors as libc::c_uint); self.inner.num_descriptors as libc::c_uint);
debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int); debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int);
pending_wait.descriptors.set_len(len + self.inner.num_descriptors); pending_wait.descriptors.set_len(len + self.inner.num_descriptors);
},
ScheduledKind::WaitResume => {
// And we add the descriptor corresponding to the resume signal
// to `event_loop.pending_wait.descriptors`.
pending_wait.descriptors.push(libc::pollfd {
fd: self.inner.resume_signal,
events: libc::POLLIN,
revents: 0,
});
}
}
// We also fill `voices`. // We also fill `voices`.
pending_wait.voices.push(self.inner.clone()); pending_wait.voices.push(self.inner.clone());
@ -396,6 +465,12 @@ impl Stream for SamplesStream {
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// If paused then we schedule the task and return `NotReady`
if self.inner.is_paused.load(Ordering::Relaxed) {
self.schedule(ScheduledKind::WaitResume);
return Ok(Async::NotReady);
}
// Determine the number of samples that are available to write. // Determine the number of samples that are available to write.
let available = { let available = {
let channel = self.inner.channel.lock().expect("could not lock channel"); let channel = self.inner.channel.lock().expect("could not lock channel");
@ -412,9 +487,9 @@ impl Stream for SamplesStream {
} }
}; };
// If we don't have one period ready, return `NotReady`. // If we don't have one period ready, schedule the task and return `NotReady`.
if available < self.inner.period_len { if available < self.inner.period_len {
self.schedule(); self.schedule(ScheduledKind::WaitPCM);
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
@ -535,9 +610,13 @@ impl Voice {
buffer_len: buffer_len, buffer_len: buffer_len,
period_len: period_len, period_len: period_len,
scheduled: Mutex::new(None), scheduled: Mutex::new(None),
is_paused: AtomicBool::new(true),
resume_signal: libc::eventfd(0, 0),
}); });
Ok((Voice, SamplesStream { Ok((Voice {
inner: samples_stream_inner.clone()
}, SamplesStream {
inner: samples_stream_inner inner: samples_stream_inner
})) }))
} }
@ -545,13 +624,21 @@ impl Voice {
#[inline] #[inline]
pub fn play(&mut self) { pub fn play(&mut self) {
// already playing // If it was paused then we resume and signal
//unimplemented!() // FIXME: the signal is send even if the event loop wasn't waiting for resume, is that an issue ?
if self.inner.is_paused.swap(false, Ordering::Relaxed) {
unsafe {
let buf = 1u64;
let wret = libc::write(self.inner.resume_signal,
&buf as *const u64 as *const _, 8);
assert!(wret == 8);
}
}
} }
#[inline] #[inline]
pub fn pause(&mut self) { pub fn pause(&mut self) {
unimplemented!() self.inner.is_paused.store(true, Ordering::Relaxed);
} }
} }