diff --git a/examples/beep.rs b/examples/beep.rs index 51db471..68bfb3d 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -7,6 +7,8 @@ use futures::task::Executor; use futures::task::Run; use std::sync::Arc; +use std::thread; +use std::time::Duration; struct MyExecutor; @@ -57,5 +59,14 @@ fn main() { Ok(()) })).execute(executor); + thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(500)); + voice.pause(); + thread::sleep(Duration::from_millis(500)); + voice.play(); + } + }); + event_loop.run(); } diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 82ddd91..7cca663 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -14,6 +14,7 @@ use UnknownTypeBuffer; use std::{ffi, cmp, iter, mem, ptr}; use std::vec::IntoIter as VecIntoIter; use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; use futures::Poll; use futures::task::Task; @@ -283,30 +284,62 @@ impl EventLoop { let mut i_voice = 0; let mut i_descriptor = 1; while i_voice < current_wait.voices.len() { - let mut revent = mem::uninitialized(); - - { - let channel = *current_wait.voices[i_voice].channel.lock().unwrap(); - let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint; - check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors - .as_mut_ptr().offset(i_descriptor), - num_descriptors, &mut revent)).unwrap(); - } - - if (revent as libc::c_short & libc::POLLOUT) != 0 { - let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); - if let Some(scheduled) = scheduled { - scheduled.unpark(); + let kind = { + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap(); + match *scheduled { + Some(ref scheduled) => scheduled.kind, + None => panic!("current wait unscheduled task"), } + }; - for _ in 0 .. current_wait.voices[i_voice].num_descriptors { - current_wait.descriptors.remove(i_descriptor as usize); + // Depending on the kind of scheduling the number of descriptors corresponding + // to the voice and the events associated are different + match kind { + ScheduledKind::WaitPCM => { + let mut revent = mem::uninitialized(); + + { + let channel = *current_wait.voices[i_voice].channel.lock().unwrap(); + let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint; + check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors + .as_mut_ptr().offset(i_descriptor), + num_descriptors, &mut revent)).unwrap(); + } + + if (revent as libc::c_short & libc::POLLOUT) != 0 { + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); + scheduled.unwrap().task.unpark(); + + for _ in 0 .. current_wait.voices[i_voice].num_descriptors { + current_wait.descriptors.remove(i_descriptor as usize); + } + current_wait.voices.remove(i_voice); + + } else { + i_descriptor += current_wait.voices[i_voice].num_descriptors as isize; + i_voice += 1; + } + }, + ScheduledKind::WaitResume => { + if current_wait.descriptors[i_descriptor as usize].revents != 0 { + // Unpark the task + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); + scheduled.unwrap().task.unpark(); + + // Emptying the signal. + let mut out = 0u64; + let ret = libc::read(current_wait.descriptors[i_descriptor as usize].fd, + &mut out as *mut u64 as *mut _, 8); + assert_eq!(ret, 8); + + // Remove from current waiting poll descriptors + current_wait.descriptors.remove(i_descriptor as usize); + current_wait.voices.remove(i_voice); + } else { + i_descriptor += 1; + i_voice += 1; + } } - current_wait.voices.remove(i_voice); - - } else { - i_descriptor += current_wait.voices[i_voice].num_descriptors as isize; - i_voice += 1; } } } @@ -314,7 +347,9 @@ impl EventLoop { } } -pub struct Voice; +pub struct Voice { + inner: Arc, +} pub struct Buffer { inner: Arc, @@ -325,6 +360,17 @@ pub struct SamplesStream { inner: Arc, } +pub struct Scheduled { + task: Task, + kind: ScheduledKind, +} + +#[derive(Clone,Copy)] +pub enum ScheduledKind { + WaitResume, + WaitPCM, +} + struct VoiceInner { // The event loop used to create the voice. event_loop: Arc, @@ -349,7 +395,14 @@ struct VoiceInner { period_len: usize, // If `Some`, something previously called `schedule` on the stream. - scheduled: Mutex>, + scheduled: Mutex>, + + // Wherease the sample stream is paused + is_paused: AtomicBool, + + // A file descriptor opened with `eventfd`. + // It is used to wait for resume signal. + resume_signal: libc::c_int, } unsafe impl Send for VoiceInner {} @@ -357,25 +410,41 @@ unsafe impl Sync for VoiceInner {} impl SamplesStream { #[inline] - fn schedule(&mut self) { + fn schedule(&mut self, kind: ScheduledKind) { unsafe { let channel = self.inner.channel.lock().unwrap(); // We start by filling `scheduled`. - *self.inner.scheduled.lock().unwrap() = Some(task::park()); + *self.inner.scheduled.lock().unwrap() = Some(Scheduled { + task: task::park(), + kind: kind, + }); - // In this function we turn the `snd_pcm_t` into a collection of file descriptors. - // And we add these descriptors to `event_loop.pending_wait.descriptors`. let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap(); - pending_wait.descriptors.reserve(self.inner.num_descriptors); + match kind { + ScheduledKind::WaitPCM => { + // In this function we turn the `snd_pcm_t` into a collection of file descriptors. + // And we add these descriptors to `event_loop.pending_wait.descriptors`. + pending_wait.descriptors.reserve(self.inner.num_descriptors); - let len = pending_wait.descriptors.len(); - let filled = alsa::snd_pcm_poll_descriptors(*channel, - pending_wait.descriptors.as_mut_ptr() - .offset(len as isize), - self.inner.num_descriptors as libc::c_uint); - debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int); - pending_wait.descriptors.set_len(len + self.inner.num_descriptors); + let len = pending_wait.descriptors.len(); + let filled = alsa::snd_pcm_poll_descriptors(*channel, + pending_wait.descriptors.as_mut_ptr() + .offset(len as isize), + self.inner.num_descriptors as libc::c_uint); + debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int); + pending_wait.descriptors.set_len(len + self.inner.num_descriptors); + }, + ScheduledKind::WaitResume => { + // And we add the descriptor corresponding to the resume signal + // to `event_loop.pending_wait.descriptors`. + pending_wait.descriptors.push(libc::pollfd { + fd: self.inner.resume_signal, + events: libc::POLLIN, + revents: 0, + }); + } + } // We also fill `voices`. pending_wait.voices.push(self.inner.clone()); @@ -396,6 +465,12 @@ impl Stream for SamplesStream { type Error = (); fn poll(&mut self) -> Poll, Self::Error> { + // If paused then we schedule the task and return `NotReady` + if self.inner.is_paused.load(Ordering::Relaxed) { + self.schedule(ScheduledKind::WaitResume); + return Ok(Async::NotReady); + } + // Determine the number of samples that are available to write. let available = { let channel = self.inner.channel.lock().expect("could not lock channel"); @@ -412,9 +487,9 @@ impl Stream for SamplesStream { } }; - // If we don't have one period ready, return `NotReady`. + // If we don't have one period ready, schedule the task and return `NotReady`. if available < self.inner.period_len { - self.schedule(); + self.schedule(ScheduledKind::WaitPCM); return Ok(Async::NotReady); } @@ -535,9 +610,13 @@ impl Voice { buffer_len: buffer_len, period_len: period_len, scheduled: Mutex::new(None), + is_paused: AtomicBool::new(true), + resume_signal: libc::eventfd(0, 0), }); - Ok((Voice, SamplesStream { + Ok((Voice { + inner: samples_stream_inner.clone() + }, SamplesStream { inner: samples_stream_inner })) } @@ -545,13 +624,21 @@ impl Voice { #[inline] pub fn play(&mut self) { - // already playing - //unimplemented!() + // If it was paused then we resume and signal + // FIXME: the signal is send even if the event loop wasn't waiting for resume, is that an issue ? + if self.inner.is_paused.swap(false, Ordering::Relaxed) { + unsafe { + let buf = 1u64; + let wret = libc::write(self.inner.resume_signal, + &buf as *const u64 as *const _, 8); + assert!(wret == 8); + } + } } #[inline] pub fn pause(&mut self) { - unimplemented!() + self.inner.is_paused.store(true, Ordering::Relaxed); } }