From f822631bc4a7962a7d87c265313bad2dac37fd84 Mon Sep 17 00:00:00 2001 From: thiolliere Date: Sun, 2 Oct 2016 13:18:27 +0200 Subject: [PATCH 1/4] impl play and pause for ALSA snd_pcm_pause could have been used but not all hardware implement it, so I propose not to use it. In this implementation: there are two kind of scheduling: wait for resume signal and wait for pcm to be available if the stream is paused then it return notready and wait for resume the event loop is different as it manages descriptors corresponding to voices according to the nature of the scheduling. there is still a FIXME: in voice.play the is signal is send even if the event loop wasn't waiting for resume. It doesn't seem to create any issue. But it happens when you write voice.pause();voice.play(); --- examples/beep.rs | 11 +++ src/alsa/mod.rs | 172 ++++++++++++++++++++++++++++++++++++----------- 2 files changed, 142 insertions(+), 41 deletions(-) diff --git a/examples/beep.rs b/examples/beep.rs index 51db471..68bfb3d 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -7,6 +7,8 @@ use futures::task::Executor; use futures::task::Run; use std::sync::Arc; +use std::thread; +use std::time::Duration; struct MyExecutor; @@ -57,5 +59,14 @@ fn main() { Ok(()) })).execute(executor); + thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(500)); + voice.pause(); + thread::sleep(Duration::from_millis(500)); + voice.play(); + } + }); + event_loop.run(); } diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 82ddd91..f9cf260 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -14,6 +14,7 @@ use UnknownTypeBuffer; use std::{ffi, cmp, iter, mem, ptr}; use std::vec::IntoIter as VecIntoIter; use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; use futures::Poll; use futures::task::Task; @@ -283,30 +284,62 @@ impl EventLoop { let mut i_voice = 0; let mut i_descriptor = 1; while i_voice < current_wait.voices.len() { - let mut revent = mem::uninitialized(); - - { - let channel = *current_wait.voices[i_voice].channel.lock().unwrap(); - let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint; - check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors - .as_mut_ptr().offset(i_descriptor), - num_descriptors, &mut revent)).unwrap(); - } - - if (revent as libc::c_short & libc::POLLOUT) != 0 { - let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); - if let Some(scheduled) = scheduled { - scheduled.unpark(); + let kind = { + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap(); + match *scheduled { + Some(ref scheduled) => scheduled.kind, + None => panic!("current wait unscheduled task"), } + }; - for _ in 0 .. current_wait.voices[i_voice].num_descriptors { - current_wait.descriptors.remove(i_descriptor as usize); + // Depending on the kind of scheduling the number of descriptors corresponding + // to the voice and the events associated are different + match kind { + ScheduledKind::WaitPCM => { + let mut revent = mem::uninitialized(); + + { + let channel = *current_wait.voices[i_voice].channel.lock().unwrap(); + let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint; + check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors + .as_mut_ptr().offset(i_descriptor), + num_descriptors, &mut revent)).unwrap(); + } + + if (revent as libc::c_short & libc::POLLOUT) != 0 { + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); + scheduled.unwrap().task.unpark(); + + for _ in 0 .. current_wait.voices[i_voice].num_descriptors { + current_wait.descriptors.remove(i_descriptor as usize); + } + current_wait.voices.remove(i_voice); + + } else { + i_descriptor += current_wait.voices[i_voice].num_descriptors as isize; + i_voice += 1; + } + }, + ScheduledKind::WaitResume => { + if current_wait.descriptors[i_descriptor as usize].revents != 0 { + // Unpark the task + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); + scheduled.unwrap().task.unpark(); + + // Emptying the signal. + let mut out = 0u64; + let ret = libc::read(current_wait.descriptors[i_descriptor as usize].fd, + &mut out as *mut u64 as *mut _, 8); + assert_eq!(ret, 8); + + // Remove from current waiting poll descriptors + current_wait.descriptors.remove(i_descriptor as usize); + current_wait.voices.remove(i_voice); + } else { + i_descriptor += 1; + i_voice += 1; + } } - current_wait.voices.remove(i_voice); - - } else { - i_descriptor += current_wait.voices[i_voice].num_descriptors as isize; - i_voice += 1; } } } @@ -314,7 +347,9 @@ impl EventLoop { } } -pub struct Voice; +pub struct Voice { + inner: Arc, +} pub struct Buffer { inner: Arc, @@ -325,6 +360,17 @@ pub struct SamplesStream { inner: Arc, } +pub struct Scheduled { + task: Task, + kind: ScheduledKind, +} + +#[derive(Clone,Copy)] +pub enum ScheduledKind { + WaitResume, + WaitPCM, +} + struct VoiceInner { // The event loop used to create the voice. event_loop: Arc, @@ -349,7 +395,14 @@ struct VoiceInner { period_len: usize, // If `Some`, something previously called `schedule` on the stream. - scheduled: Mutex>, + scheduled: Mutex>, + + // Wherease the sample stream is paused + is_paused: Arc, + + // A file descriptor opened with `eventfd`. + // It is used to wait for resume signal. + resume_signal: libc::c_int, } unsafe impl Send for VoiceInner {} @@ -357,25 +410,41 @@ unsafe impl Sync for VoiceInner {} impl SamplesStream { #[inline] - fn schedule(&mut self) { + fn schedule(&mut self, kind: ScheduledKind) { unsafe { let channel = self.inner.channel.lock().unwrap(); // We start by filling `scheduled`. - *self.inner.scheduled.lock().unwrap() = Some(task::park()); + *self.inner.scheduled.lock().unwrap() = Some(Scheduled { + task: task::park(), + kind: kind, + }); - // In this function we turn the `snd_pcm_t` into a collection of file descriptors. - // And we add these descriptors to `event_loop.pending_wait.descriptors`. let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap(); - pending_wait.descriptors.reserve(self.inner.num_descriptors); + match kind { + ScheduledKind::WaitPCM => { + // In this function we turn the `snd_pcm_t` into a collection of file descriptors. + // And we add these descriptors to `event_loop.pending_wait.descriptors`. + pending_wait.descriptors.reserve(self.inner.num_descriptors); - let len = pending_wait.descriptors.len(); - let filled = alsa::snd_pcm_poll_descriptors(*channel, - pending_wait.descriptors.as_mut_ptr() - .offset(len as isize), - self.inner.num_descriptors as libc::c_uint); - debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int); - pending_wait.descriptors.set_len(len + self.inner.num_descriptors); + let len = pending_wait.descriptors.len(); + let filled = alsa::snd_pcm_poll_descriptors(*channel, + pending_wait.descriptors.as_mut_ptr() + .offset(len as isize), + self.inner.num_descriptors as libc::c_uint); + debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int); + pending_wait.descriptors.set_len(len + self.inner.num_descriptors); + }, + ScheduledKind::WaitResume => { + // And we add the descriptor corresponding to the resume signal + // to `event_loop.pending_wait.descriptors`. + pending_wait.descriptors.push(libc::pollfd { + fd: self.inner.resume_signal, + events: libc::POLLIN, + revents: 0, + }); + } + } // We also fill `voices`. pending_wait.voices.push(self.inner.clone()); @@ -396,6 +465,12 @@ impl Stream for SamplesStream { type Error = (); fn poll(&mut self) -> Poll, Self::Error> { + // If paused then we schedule the task and return `NotReady` + if self.inner.is_paused.load(Ordering::Relaxed) { + self.schedule(ScheduledKind::WaitResume); + return Ok(Async::NotReady); + } + // Determine the number of samples that are available to write. let available = { let channel = self.inner.channel.lock().expect("could not lock channel"); @@ -412,9 +487,9 @@ impl Stream for SamplesStream { } }; - // If we don't have one period ready, return `NotReady`. + // If we don't have one period ready, schedule the task and return `NotReady`. if available < self.inner.period_len { - self.schedule(); + self.schedule(ScheduledKind::WaitPCM); return Ok(Async::NotReady); } @@ -526,6 +601,9 @@ impl Voice { num_descriptors as usize }; + // The voice is initialized as paused + let is_paused = Arc::new(AtomicBool::new(true)); + let samples_stream_inner = Arc::new(VoiceInner { event_loop: event_loop.inner.clone(), channel: Mutex::new(playback_handle), @@ -535,9 +613,13 @@ impl Voice { buffer_len: buffer_len, period_len: period_len, scheduled: Mutex::new(None), + is_paused: is_paused.clone(), + resume_signal: libc::eventfd(0, 0), }); - Ok((Voice, SamplesStream { + Ok((Voice { + inner: samples_stream_inner.clone() + }, SamplesStream { inner: samples_stream_inner })) } @@ -545,13 +627,21 @@ impl Voice { #[inline] pub fn play(&mut self) { - // already playing - //unimplemented!() + // If it was paused then we resume and signal + // FIXME: the signal is send even if the event loop wasn't waiting for resume, is that an issue ? + if self.inner.is_paused.swap(false,Ordering::Relaxed) { + unsafe { + let buf = 1u64; + let wret = libc::write(self.inner.resume_signal, + &buf as *const u64 as *const _, 8); + assert!(wret == 8); + } + } } #[inline] pub fn pause(&mut self) { - unimplemented!() + self.inner.is_paused.store(true,Ordering::Relaxed); } } From 1b0c9f2c54113f4ba2c5b050117e31093e4212c5 Mon Sep 17 00:00:00 2001 From: thiolliere Date: Sun, 2 Oct 2016 14:19:58 +0200 Subject: [PATCH 2/4] syntax: space after comma --- src/alsa/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index f9cf260..bf9bc5a 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -629,7 +629,7 @@ impl Voice { pub fn play(&mut self) { // If it was paused then we resume and signal // FIXME: the signal is send even if the event loop wasn't waiting for resume, is that an issue ? - if self.inner.is_paused.swap(false,Ordering::Relaxed) { + if self.inner.is_paused.swap(false, Ordering::Relaxed) { unsafe { let buf = 1u64; let wret = libc::write(self.inner.resume_signal, @@ -641,7 +641,7 @@ impl Voice { #[inline] pub fn pause(&mut self) { - self.inner.is_paused.store(true,Ordering::Relaxed); + self.inner.is_paused.store(true, Ordering::Relaxed); } } From 5c86eec4f3da2a2025a41e55c988ad923f8177bf Mon Sep 17 00:00:00 2001 From: thiolliere Date: Sun, 2 Oct 2016 14:21:33 +0200 Subject: [PATCH 3/4] line not necessary --- src/alsa/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index bf9bc5a..feea078 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -601,9 +601,6 @@ impl Voice { num_descriptors as usize }; - // The voice is initialized as paused - let is_paused = Arc::new(AtomicBool::new(true)); - let samples_stream_inner = Arc::new(VoiceInner { event_loop: event_loop.inner.clone(), channel: Mutex::new(playback_handle), @@ -613,7 +610,7 @@ impl Voice { buffer_len: buffer_len, period_len: period_len, scheduled: Mutex::new(None), - is_paused: is_paused.clone(), + is_paused: Arc::new(AtomicBool::new(true)), resume_signal: libc::eventfd(0, 0), }); From 25bb025a9acde41f52d90d2853e2f85418c205c5 Mon Sep 17 00:00:00 2001 From: thiolliere Date: Sun, 2 Oct 2016 14:24:30 +0200 Subject: [PATCH 4/4] arc not necessary --- src/alsa/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index feea078..7cca663 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -398,7 +398,7 @@ struct VoiceInner { scheduled: Mutex>, // Wherease the sample stream is paused - is_paused: Arc, + is_paused: AtomicBool, // A file descriptor opened with `eventfd`. // It is used to wait for resume signal. @@ -610,7 +610,7 @@ impl Voice { buffer_len: buffer_len, period_len: period_len, scheduled: Mutex::new(None), - is_paused: Arc::new(AtomicBool::new(true)), + is_paused: AtomicBool::new(true), resume_signal: libc::eventfd(0, 0), });