diff --git a/examples/beep.rs b/examples/beep.rs index a3b61be..51db471 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -1,14 +1,27 @@ extern crate cpal; extern crate futures; -use futures::Future; use futures::stream::Stream; +use futures::task; +use futures::task::Executor; +use futures::task::Run; + +use std::sync::Arc; + +struct MyExecutor; + +impl Executor for MyExecutor { + fn execute(&self, r: Run) { + r.run(); + } +} fn main() { let endpoint = cpal::get_default_endpoint().expect("Failed to get default endpoint"); let format = endpoint.get_supported_formats_list().unwrap().next().expect("Failed to get endpoint format"); let event_loop = cpal::EventLoop::new(); + let executor = Arc::new(MyExecutor); let (mut voice, stream) = cpal::Voice::new(&endpoint, &format, &event_loop).expect("Failed to create a voice"); @@ -18,7 +31,7 @@ fn main() { .map(move |t| t.sin()); voice.play(); - stream.for_each(move |buffer| -> Result<_, ()> { + task::spawn(stream.for_each(move |buffer| -> Result<_, ()> { match buffer { cpal::UnknownTypeBuffer::U16(mut buffer) => { for (sample, value) in buffer.chunks_mut(format.channels.len()).zip(&mut data_source) { @@ -42,7 +55,7 @@ fn main() { }; Ok(()) - }).forget(); + })).execute(executor); event_loop.run(); } diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 350e64e..82ddd91 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -16,9 +16,10 @@ use std::vec::IntoIter as VecIntoIter; use std::sync::{Arc, Mutex}; use futures::Poll; -use futures::Task; -use futures::TaskHandle; +use futures::task::Task; +use futures::task; use futures::stream::Stream; +use futures::Async; pub type SupportedFormatsIterator = VecIntoIter; @@ -295,7 +296,7 @@ impl EventLoop { if (revent as libc::c_short & libc::POLLOUT) != 0 { let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); if let Some(scheduled) = scheduled { - scheduled.notify(); + scheduled.unpark(); } for _ in 0 .. current_wait.voices[i_voice].num_descriptors { @@ -342,80 +343,26 @@ struct VoiceInner { num_channels: u16, // Number of samples that can fit in the buffer. - buffer_len: usize, + buffer_len: usize, // Minimum number of samples to put in the buffer. period_len: usize, // If `Some`, something previously called `schedule` on the stream. - scheduled: Mutex>, + scheduled: Mutex>, } unsafe impl Send for VoiceInner {} unsafe impl Sync for VoiceInner {} -impl Stream for SamplesStream { - type Item = UnknownTypeBuffer; - type Error = (); - - fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { - // Determine the number of samples that are available to write. - let available = { - let channel = self.inner.channel.lock().expect("could not lock channel"); - let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update? - - if available == -32 { - // buffer underrun - self.inner.buffer_len - } else if available < 0 { - check_errors(available as libc::c_int).expect("buffer is not available"); - unreachable!() - } else { - (available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize - } - }; - - // If we don't have one period ready, return `NotReady`. - if available < self.inner.period_len { - return Poll::NotReady; - } - - // We now sure that we're ready to write data. - match self.inner.sample_format { - SampleFormat::I16 => { - let buffer = Buffer { - buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), - inner: self.inner.clone(), - }; - - Poll::Ok(Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) }))) - }, - SampleFormat::U16 => { - let buffer = Buffer { - buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), - inner: self.inner.clone(), - }; - - Poll::Ok(Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) }))) - }, - SampleFormat::F32 => { - let buffer = Buffer { - buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), - inner: self.inner.clone(), - }; - - Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) - }, - } - } - +impl SamplesStream { #[inline] - fn schedule(&mut self, task: &mut Task) { + fn schedule(&mut self) { unsafe { let channel = self.inner.channel.lock().unwrap(); // We start by filling `scheduled`. - *self.inner.scheduled.lock().unwrap() = Some(task.handle().clone()); + *self.inner.scheduled.lock().unwrap() = Some(task::park()); // In this function we turn the `snd_pcm_t` into a collection of file descriptors. // And we add these descriptors to `event_loop.pending_wait.descriptors`. @@ -444,6 +391,63 @@ impl Stream for SamplesStream { } } +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + // Determine the number of samples that are available to write. + let available = { + let channel = self.inner.channel.lock().expect("could not lock channel"); + let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update? + + if available == -32 { + // buffer underrun + self.inner.buffer_len + } else if available < 0 { + check_errors(available as libc::c_int).expect("buffer is not available"); + unreachable!() + } else { + (available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize + } + }; + + // If we don't have one period ready, return `NotReady`. + if available < self.inner.period_len { + self.schedule(); + return Ok(Async::NotReady); + } + + // We now sure that we're ready to write data. + match self.inner.sample_format { + SampleFormat::I16 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Ok(Async::Ready((Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) }))))) + }, + SampleFormat::U16 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Ok(Async::Ready((Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) }))))) + }, + SampleFormat::F32 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Ok(Async::Ready((Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))))) + }, + } + } +} + /// Wrapper around `hw_params`. struct HwParams(*mut alsa::snd_pcm_hw_params_t); diff --git a/src/lib.rs b/src/lib.rs index dca352e..22bb1f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,15 +22,28 @@ The `voice` can be used to control the play/pause of the output, while the `samp be used to register a callback that will be called whenever the backend is ready to get data. See the documentation of `futures-rs` for more info about how to use streams. -```ignore // TODO: unignore -# let mut samples_stream: cpal::SamplesStream = unsafe { std::mem::uninitialized() }; +```no_run +# extern crate futures; +# extern crate cpal; +# use std::sync::Arc; use futures::stream::Stream; +use futures::task; +# struct MyExecutor; +# impl task::Executor for MyExecutor { +# fn execute(&self, r: task::Run) { +# r.run(); +# } +# } +# fn main() { +# let mut samples_stream: cpal::SamplesStream = unsafe { std::mem::uninitialized() }; +# let my_executor = Arc::new(MyExecutor); -samples_stream.for_each(move |buffer| -> Result<_, ()> { +task::spawn(samples_stream.for_each(move |buffer| -> Result<_, ()> { // write data to `buffer` here Ok(()) -}).forget(); +})).execute(my_executor); +# } ``` TODO: add example @@ -72,7 +85,6 @@ use std::ops::{Deref, DerefMut}; use futures::stream::Stream; use futures::Poll; -use futures::Task; mod null; mod samples_formats; @@ -397,13 +409,8 @@ impl Stream for SamplesStream { type Error = (); #[inline] - fn poll(&mut self, task: &mut Task) -> Poll, Self::Error> { - self.0.poll(task) - } - - #[inline] - fn schedule(&mut self, task: &mut Task) { - self.0.schedule(task) + fn poll(&mut self) -> Poll, Self::Error> { + self.0.poll() } } diff --git a/src/null/mod.rs b/src/null/mod.rs index 5e24ea8..5eaca60 100644 --- a/src/null/mod.rs +++ b/src/null/mod.rs @@ -3,8 +3,8 @@ use std::marker::PhantomData; use futures::Poll; -use futures::Task; use futures::stream::Stream; +use futures::Async; use CreationError; use Format; @@ -89,12 +89,8 @@ impl Stream for SamplesStream { type Error = (); #[inline] - fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { - Poll::NotReady - } - - #[inline] - fn schedule(&mut self, _: &mut Task) { + fn poll(&mut self) -> Poll, Self::Error> { + Ok(Async::NotReady) } } diff --git a/src/wasapi/voice.rs b/src/wasapi/voice.rs index eb3a9ea..f29e3e1 100644 --- a/src/wasapi/voice.rs +++ b/src/wasapi/voice.rs @@ -14,9 +14,10 @@ use std::sync::Arc; use std::sync::Mutex; use futures::Poll; -use futures::Task; -use futures::TaskHandle; +use futures::task::Task; +use futures::task; use futures::stream::Stream; +use futures::Async; use CreationError; use ChannelPosition; @@ -61,7 +62,7 @@ struct EventLoopScheduled { // List of task handles corresponding to `handles`. The second element is used to signal // the voice that it has been signaled. - task_handles: Vec<(TaskHandle, Arc)>, + task_handles: Vec<(Task, Arc)>, } impl EventLoop { @@ -118,7 +119,7 @@ impl EventLoop { scheduled.handles.remove(handle_id); let (task_handle, ready) = scheduled.task_handles.remove(handle_id - 1); ready.store(true, Ordering::Relaxed); - task_handle.notify(); + task_handle.unpark(); } } } @@ -348,70 +349,87 @@ impl Voice { } } +impl SamplesStream { + #[inline] + fn schedule(&mut self) { + let mut pending = self.event_loop.pending_scheduled.lock().unwrap(); + pending.handles.push(self.event); + pending.task_handles.push((task::park(), self.ready.clone())); + drop(pending); + + let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) }; + assert!(result != 0); + } +} + impl Stream for SamplesStream { type Item = UnknownTypeBuffer; type Error = (); - fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { + fn poll(&mut self) -> Poll, Self::Error> { unsafe { if self.ready.swap(false, Ordering::Relaxed) == false { // Despite its name this function does not block, because we pass `0`. let result = kernel32::WaitForSingleObject(self.event, 0); - // Returning if the event is not ready. + // Park the task and returning if the event is not ready. match result { winapi::WAIT_OBJECT_0 => (), - winapi::WAIT_TIMEOUT => return Poll::NotReady, + winapi::WAIT_TIMEOUT => { + self.schedule(); + return Ok(Async::NotReady); + }, _ => unreachable!() }; } // If we reach here, that means we're ready to accept new samples. - let mut inner = self.inner.lock().unwrap(); + let poll = { + let mut inner = self.inner.lock().unwrap(); - // Obtaining the number of frames that are available to be written. - let frames_available = { - let mut padding = mem::uninitialized(); - let hresult = (*inner.audio_client).GetCurrentPadding(&mut padding); - check_result(hresult).unwrap(); - self.max_frames_in_buffer - padding + // Obtaining the number of frames that are available to be written. + let frames_available = { + let mut padding = mem::uninitialized(); + let hresult = (*inner.audio_client).GetCurrentPadding(&mut padding); + check_result(hresult).unwrap(); + self.max_frames_in_buffer - padding + }; + + if frames_available == 0 { + Ok(Async::NotReady) + } else { + + // Obtaining a pointer to the buffer. + let (buffer_data, buffer_len) = { + let mut buffer: *mut winapi::BYTE = mem::uninitialized(); + let hresult = (*inner.render_client).GetBuffer(frames_available, + &mut buffer as *mut *mut _); + check_result(hresult).unwrap(); // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` + debug_assert!(!buffer.is_null()); + + (buffer as *mut _, + frames_available as usize * self.bytes_per_frame as usize / mem::size_of::()) // FIXME: correct size + }; + + let buffer = Buffer { + voice: self.inner.clone(), + buffer_data: buffer_data, + buffer_len: buffer_len, + frames: frames_available, + }; + + Ok(Async::Ready(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))) // FIXME: not necessarily F32 + } }; - if frames_available == 0 { return Poll::NotReady; } + if let Ok(Async::NotReady) = poll { + self.schedule(); + } - // Obtaining a pointer to the buffer. - let (buffer_data, buffer_len) = { - let mut buffer: *mut winapi::BYTE = mem::uninitialized(); - let hresult = (*inner.render_client).GetBuffer(frames_available, - &mut buffer as *mut *mut _); - check_result(hresult).unwrap(); // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` - debug_assert!(!buffer.is_null()); - - (buffer as *mut _, - frames_available as usize * self.bytes_per_frame as usize / mem::size_of::()) // FIXME: correct size - }; - - let buffer = Buffer { - voice: self.inner.clone(), - buffer_data: buffer_data, - buffer_len: buffer_len, - frames: frames_available, - }; - - Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) // FIXME: not necessarily F32 + poll } } - - fn schedule(&mut self, task: &mut Task) { - let mut pending = self.event_loop.pending_scheduled.lock().unwrap(); - pending.handles.push(self.event); - pending.task_handles.push((task.handle().clone(), self.ready.clone())); - drop(pending); - - let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) }; - assert!(result != 0); - } } impl Drop for VoiceInner {