From be8310da5122bae7bb969c2e1e6b7e046fcfdf8b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 2 Aug 2016 16:13:59 +0200 Subject: [PATCH] Draft for switching to futures --- Cargo.toml | 2 + examples/beep.rs | 27 +++- src/lib.rs | 122 +++++++-------- src/wasapi/mod.rs | 3 +- src/wasapi/voice.rs | 352 +++++++++++++++++++++++++++++++------------- 5 files changed, 322 insertions(+), 184 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8f08cb9..027e03b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,12 @@ license = "Apache-2.0" keywords = ["audio", "sound"] [dependencies] +futures = { git = "https://github.com/alexcrichton/futures-rs" } libc = "0.2" lazy_static = "0.2" winapi = "0.2.8" ole32-sys = "0.2" +kernel32-sys = "0.2" [target.arm-unknown-linux-gnueabihf.dependencies.alsa-sys] version = "0" diff --git a/examples/beep.rs b/examples/beep.rs index 3c9272c..a3b61be 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -1,16 +1,25 @@ extern crate cpal; +extern crate futures; + +use futures::Future; +use futures::stream::Stream; fn main() { let endpoint = cpal::get_default_endpoint().expect("Failed to get default endpoint"); let format = endpoint.get_supported_formats_list().unwrap().next().expect("Failed to get endpoint format"); - let mut channel = cpal::Voice::new(&endpoint, &format).expect("Failed to create a channel"); + + let event_loop = cpal::EventLoop::new(); + + let (mut voice, stream) = cpal::Voice::new(&endpoint, &format, &event_loop).expect("Failed to create a voice"); // Produce a sinusoid of maximum amplitude. - let mut data_source = (0u64..).map(|t| t as f32 * 440.0 * 2.0 * 3.141592 / format.samples_rate.0 as f32) // 440 Hz - .map(|t| t.sin()); + let samples_rate = format.samples_rate.0 as f32; + let mut data_source = (0u64..).map(move |t| t as f32 * 440.0 * 2.0 * 3.141592 / samples_rate) // 440 Hz + .map(move |t| t.sin()); - loop { - match channel.append_data(32768) { + voice.play(); + stream.for_each(move |buffer| -> Result<_, ()> { + match buffer { cpal::UnknownTypeBuffer::U16(mut buffer) => { for (sample, value) in buffer.chunks_mut(format.channels.len()).zip(&mut data_source) { let value = ((value * 0.5 + 0.5) * std::u16::MAX as f32) as u16; @@ -30,8 +39,10 @@ fn main() { for out in sample.iter_mut() { *out = value; } } }, - } + }; - channel.play(); - } + Ok(()) + }).forget(); + + event_loop.run(); } diff --git a/src/lib.rs b/src/lib.rs index 6909038..f92e4e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ reaches the end of the data, it will stop playing. You must continuously fill th calling `append_data` repeatedly if you don't want the audio to stop playing. */ + +extern crate futures; #[macro_use] extern crate lazy_static; extern crate libc; @@ -50,6 +52,10 @@ use std::fmt; use std::error::Error; use std::ops::{Deref, DerefMut}; +use futures::stream::Stream; +use futures::Poll; +use futures::Task; + mod null; mod samples_formats; @@ -169,29 +175,43 @@ impl Iterator for SupportedFormatsIterator { } } +pub struct EventLoop(cpal_impl::EventLoop); + +impl EventLoop { + #[inline] + pub fn new() -> EventLoop { + EventLoop(cpal_impl::EventLoop::new()) + } + + #[inline] + pub fn run(&self) { + self.0.run() + } +} + /// Represents a buffer that must be filled with audio data. /// /// You should destroy this object as soon as possible. Data is only committed when it /// is destroyed. #[must_use] -pub struct Buffer<'a, T: 'a> where T: Sample { +pub struct Buffer where T: Sample { // also contains something, taken by `Drop` - target: Option>, + target: Option>, } /// This is the struct that is provided to you by cpal when you want to write samples to a buffer. /// /// Since the type of data is only known at runtime, you have to fill the right buffer. -pub enum UnknownTypeBuffer<'a> { +pub enum UnknownTypeBuffer { /// Samples whose format is `u16`. - U16(Buffer<'a, u16>), + U16(Buffer), /// Samples whose format is `i16`. - I16(Buffer<'a, i16>), + I16(Buffer), /// Samples whose format is `f32`. - F32(Buffer<'a, f32>), + F32(Buffer), } -impl<'a> UnknownTypeBuffer<'a> { +impl UnknownTypeBuffer { /// Returns the length of the buffer in number of samples. #[inline] pub fn len(&self) -> usize { @@ -282,13 +302,19 @@ pub struct Voice { impl Voice { /// Builds a new channel. #[inline] - pub fn new(endpoint: &Endpoint, format: &Format) -> Result { - let channel = try!(cpal_impl::Voice::new(&endpoint.0, format)); + pub fn new(endpoint: &Endpoint, format: &Format, event_loop: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { + let (voice, stream) = try!(cpal_impl::Voice::new(&endpoint.0, format, &event_loop.0)); - Ok(Voice { - voice: channel, + let voice = Voice { + voice: voice, format: format.clone(), - }) + }; + + let stream = SamplesStream(stream); + + Ok((voice, stream)) } /// Returns the format used by the voice. @@ -324,51 +350,6 @@ impl Voice { self.format().data_type } - /// Returns the minimum number of samples that should be put in a buffer before it is - /// processable by the audio output. - /// - /// If you put less than this value in the buffer, the buffer will not be processed and you - /// risk an underrun. - #[inline] - pub fn get_period(&self) -> usize { - self.voice.get_period() - } - - /// Adds some PCM data to the voice's buffer. - /// - /// This function indirectly returns a `Buffer` object that must be filled with the audio data. - /// The size of the buffer being returned depends on the current state of the backend - /// and can't be known in advance. However it is never greater than `max_samples`. - /// - /// You must fill the buffer *entirely*, so do not set `max_samples` to a value greater - /// than the amount of data available to you. - /// - /// Channels are interleaved. For example if you have two channels, you must write - /// the first sample of the first channel, then the first sample of the second channel, - /// then the second sample of the first channel, then the second sample of the second - /// channel, etc. - /// - /// ## Panic - /// - /// Panics if `max_samples` is 0. - /// - #[inline] - pub fn append_data(&mut self, max_samples: usize) -> UnknownTypeBuffer { - assert!(max_samples != 0); - - match self.get_samples_format() { - SampleFormat::U16 => UnknownTypeBuffer::U16(Buffer { - target: Some(self.voice.append_data(max_samples)) - }), - SampleFormat::I16 => UnknownTypeBuffer::I16(Buffer { - target: Some(self.voice.append_data(max_samples)) - }), - SampleFormat::F32 => UnknownTypeBuffer::F32(Buffer { - target: Some(self.voice.append_data(max_samples)) - }), - } - } - /// Sends a command to the audio device that it should start playing. /// /// Has no effect is the voice was already playing. @@ -389,25 +370,26 @@ impl Voice { pub fn pause(&mut self) { self.voice.pause() } +} + +pub struct SamplesStream(cpal_impl::SamplesStream); + +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); - /// Returns the number of samples in the buffer that are currently being processed by the - /// audio playback backend. - /// - /// This function is useful to determine how much time it will take to finish playing the - /// current sound. #[inline] - pub fn get_pending_samples(&self) -> usize { - self.voice.get_pending_samples() + fn poll(&mut self, task: &mut Task) -> Poll, Self::Error> { + self.0.poll(task) } - /// Returns true if the voice has finished reading all the data you sent to it. #[inline] - pub fn underflowed(&self) -> bool { - self.voice.underflowed() + fn schedule(&mut self, task: &mut Task) { + self.0.schedule(task) } } -impl<'a, T> Deref for Buffer<'a, T> where T: Sample { +impl Deref for Buffer where T: Sample { type Target = [T]; #[inline] @@ -416,14 +398,14 @@ impl<'a, T> Deref for Buffer<'a, T> where T: Sample { } } -impl<'a, T> DerefMut for Buffer<'a, T> where T: Sample { +impl DerefMut for Buffer where T: Sample { #[inline] fn deref_mut(&mut self) -> &mut [T] { self.target.as_mut().unwrap().get_buffer() } } -impl<'a, T> Drop for Buffer<'a, T> where T: Sample { +impl Drop for Buffer where T: Sample { #[inline] fn drop(&mut self) { self.target.take().unwrap().finish(); diff --git a/src/wasapi/mod.rs b/src/wasapi/mod.rs index 5d6267c..a951b5c 100644 --- a/src/wasapi/mod.rs +++ b/src/wasapi/mod.rs @@ -1,5 +1,6 @@ extern crate winapi; extern crate ole32; +extern crate kernel32; use std::io::Error as IoError; use std::os::windows::ffi::OsStringExt; @@ -17,7 +18,7 @@ use SampleFormat; pub use std::option::IntoIter as OptionIntoIter; pub use self::enumerate::{EndpointsIterator, get_default_endpoint}; -pub use self::voice::{Voice, Buffer}; +pub use self::voice::{Voice, Buffer, EventLoop, SamplesStream}; pub type SupportedFormatsIterator = OptionIntoIter; diff --git a/src/wasapi/voice.rs b/src/wasapi/voice.rs index 1cf0fdf..3d49469 100644 --- a/src/wasapi/voice.rs +++ b/src/wasapi/voice.rs @@ -1,4 +1,5 @@ use super::com; +use super::kernel32; use super::ole32; use super::winapi; use super::Endpoint; @@ -9,25 +10,141 @@ use std::slice; use std::mem; use std::ptr; use std::marker::PhantomData; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Mutex; + +use futures::Poll; +use futures::Task; +use futures::TaskHandle; +use futures::stream::Stream; use CreationError; use ChannelPosition; use Format; use SampleFormat; +use UnknownTypeBuffer; + +pub struct EventLoop { + inner: Arc, +} + +unsafe impl Send for EventLoop {} +unsafe impl Sync for EventLoop {} + +struct EventLoopInner { + // This event is signalled after elements have been added to `pending_scheduled`. + pending_scheduled_event: winapi::HANDLE, + scheduled: Mutex, + pending_scheduled: Mutex, +} + +struct EventLoopScheduled { + // List of handles that correspond to voices. + // They are linked to `task_handles`, but we store them separately in order to easily call + // `WaitForMultipleObjectsEx` on the array without having to perform any conversion. + handles: Vec, + + // List of task handles corresponding to `handles`. + task_handles: Vec<(TaskHandle, Arc)>, +} + +impl EventLoop { + pub fn new() -> EventLoop { + let pending_scheduled_event = unsafe { + kernel32::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) + }; + + EventLoop { + inner: Arc::new(EventLoopInner { + pending_scheduled_event: pending_scheduled_event, + scheduled: Mutex::new(EventLoopScheduled { + handles: vec![pending_scheduled_event], + task_handles: vec![], + }), + pending_scheduled: Mutex::new(EventLoopScheduled { + handles: vec![], + task_handles: vec![], + }) + }) + } + } + + pub fn run(&self) { + unsafe { + let mut scheduled = self.inner.scheduled.lock().unwrap(); + + loop { + // Creating a voice checks for the MAXIMUM_WAIT_OBJECTS limit. + debug_assert!(scheduled.handles.len() <= winapi::MAXIMUM_WAIT_OBJECTS as usize); + + // Wait for any of the handles to be signalled, which means that the corresponding + // sound needs a buffer. + let result = kernel32::WaitForMultipleObjectsEx(scheduled.handles.len() as u32, + scheduled.handles.as_ptr(), + winapi::FALSE, winapi::INFINITE, /* TODO: allow setting a timeout */ + winapi::FALSE /* irrelevant parameter here */); + + // Notifying the corresponding task handler. + assert!(result >= winapi::WAIT_OBJECT_0); + let handle_id = (result - winapi::WAIT_OBJECT_0) as usize; + + if handle_id == 0 { + let mut pending = self.inner.pending_scheduled.lock().unwrap(); + scheduled.handles.append(&mut pending.handles); + scheduled.task_handles.append(&mut pending.task_handles); + + } else { + scheduled.handles.remove(handle_id); + let (task_handle, ready) = scheduled.task_handles.remove(handle_id - 1); + ready.store(true, Ordering::Relaxed); + task_handle.notify(); + } + } + } + } +} + +impl Drop for EventLoop { + #[inline] + fn drop(&mut self) { + unsafe { + kernel32::CloseHandle(self.inner.pending_scheduled_event); + } + } +} pub struct Voice { - audio_client: *mut winapi::IAudioClient, - render_client: *mut winapi::IAudioRenderClient, + inner: Arc>, + playing: bool, +} + +pub struct SamplesStream { + event_loop: Arc, + inner: Arc>, + // The event that is signalled whenever a buffer is ready to be submitted to the voice. + event: winapi::HANDLE, // TODO: not deleted max_frames_in_buffer: winapi::UINT32, bytes_per_frame: winapi::WORD, - playing: bool, + ready: Arc, +} + +unsafe impl Send for SamplesStream {} +unsafe impl Sync for SamplesStream {} + +struct VoiceInner { + audio_client: *mut winapi::IAudioClient, + render_client: *mut winapi::IAudioRenderClient, } unsafe impl Send for Voice {} unsafe impl Sync for Voice {} impl Voice { - pub fn new(end_point: &Endpoint, format: &Format) -> Result { + pub fn new(end_point: &Endpoint, format: &Format, event_loop: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { unsafe { // making sure that COM is initialized // it's not actually sure that this is required, but when in doubt do it @@ -76,8 +193,9 @@ impl Voice { }; // finally initializing the audio client - let hresult = (*audio_client).Initialize(share_mode, 0, 10000000, 0, - &format_attempt.Format, ptr::null()); + let hresult = (*audio_client).Initialize(share_mode, + winapi::AUDCLNT_STREAMFLAGS_EVENTCALLBACK, + 0, 0, &format_attempt.Format, ptr::null()); match check_result(hresult) { Err(ref e) if e.raw_os_error() == Some(winapi::AUDCLNT_E_DEVICE_INVALIDATED) => { @@ -94,6 +212,25 @@ impl Voice { format_attempt.Format }; + // Creating the event that will be signalled whenever we need to submit some samples. + let event = { + let event = kernel32::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()); + if event == ptr::null_mut() { + (*audio_client).Release(); + panic!("Failed to create event"); + } + + match check_result((*audio_client).SetEventHandle(event)) { + Err(e) => { + (*audio_client).Release(); + panic!("Failed to call SetEventHandle") + }, + Ok(_) => () + }; + + event + }; + // obtaining the size of the samples buffer in number of frames let max_frames_in_buffer = { let mut max_frames_in_buffer = mem::uninitialized(); @@ -115,7 +252,7 @@ impl Voice { max_frames_in_buffer }; - // building a `IAudioRenderClient` that will be used to fill the samples buffer + // Building a `IAudioRenderClient` that will be used to fill the samples buffer. let render_client = { let mut render_client: *mut winapi::IAudioRenderClient = mem::uninitialized(); let hresult = (*audio_client).GetService(&winapi::IID_IAudioRenderClient, @@ -139,79 +276,37 @@ impl Voice { &mut *render_client }; - // everything went fine - Ok(Voice { + // Everything went fine. + let inner = Arc::new(Mutex::new(VoiceInner { audio_client: audio_client, render_client: render_client, + })); + + let voice = Voice { + inner: inner.clone(), + playing: false, + }; + + let samples_stream = SamplesStream { + event_loop: event_loop.inner.clone(), + inner: inner, + event: event, max_frames_in_buffer: max_frames_in_buffer, bytes_per_frame: format.nBlockAlign, - playing: false, - }) - } - } - - pub fn append_data<'a, T>(&'a mut self, max_elements: usize) -> Buffer<'a, T> { - unsafe { - // obtaining the number of frames that are available to be written - let frames_available = { - let mut padding = mem::uninitialized(); - let hresult = (*self.audio_client).GetCurrentPadding(&mut padding); - check_result(hresult).unwrap(); - self.max_frames_in_buffer - padding + ready: Arc::new(AtomicBool::new(false)), }; - // making sure `frames_available` is inferior to `max_elements` - let frames_available = cmp::min(frames_available, - max_elements as u32 * mem::size_of::() as u32 / - self.bytes_per_frame as u32); - - // the WASAPI has some weird behaviors when the buffer size is zero, so we handle this - // ourselves - if frames_available == 0 { - return Buffer::Empty; - } - - // obtaining a pointer to the buffer - let (buffer_data, buffer_len) = { - let mut buffer: *mut winapi::BYTE = mem::uninitialized(); - let hresult = (*self.render_client).GetBuffer(frames_available, - &mut buffer as *mut *mut _); - check_result(hresult).unwrap(); // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` - debug_assert!(!buffer.is_null()); - - (buffer as *mut T, - frames_available as usize * self.bytes_per_frame as usize / mem::size_of::()) - }; - - Buffer::Buffer { - render_client: self.render_client, - buffer_data: buffer_data, - buffer_len: buffer_len, - frames: frames_available, - marker: PhantomData, - } - } - } - - #[inline] - pub fn get_period(&self) -> usize { - 0 - } - - pub fn get_pending_samples(&self) -> usize { - unsafe { - let mut padding = mem::uninitialized(); - let hresult = (*self.audio_client).GetCurrentPadding(&mut padding); - check_result(hresult).unwrap(); - padding as usize + Ok((voice, samples_stream)) } } #[inline] pub fn play(&mut self) { if !self.playing { + let mut inner = self.inner.lock().unwrap(); + unsafe { - let hresult = (*self.audio_client).Start(); + let hresult = (*inner.audio_client).Start(); check_result(hresult).unwrap(); } } @@ -222,27 +317,82 @@ impl Voice { #[inline] pub fn pause(&mut self) { if self.playing { + let mut inner = self.inner.lock().unwrap(); + unsafe { - let hresult = (*self.audio_client).Stop(); + let hresult = (*inner.audio_client).Stop(); check_result(hresult).unwrap(); } } self.playing = false; } +} - pub fn underflowed(&self) -> bool { +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); + + fn poll(&mut self, task: &mut Task) -> Poll, Self::Error> { unsafe { - let mut padding = mem::uninitialized(); - let hresult = (*self.audio_client).GetCurrentPadding(&mut padding); - check_result(hresult).unwrap(); - - padding == 0 + if self.ready.swap(false, Ordering::Relaxed) == false { + let result = kernel32::WaitForSingleObject(self.event, 0); + + // Returning if timeout. + match result { + winapi::WAIT_OBJECT_0 => (), + winapi::WAIT_TIMEOUT => return Poll::NotReady, + _ => unreachable!() + }; + } + + let mut inner = self.inner.lock().unwrap(); + + // Obtaining the number of frames that are available to be written. + let frames_available = { + let mut padding = mem::uninitialized(); + let hresult = (*inner.audio_client).GetCurrentPadding(&mut padding); + check_result(hresult).unwrap(); + self.max_frames_in_buffer - padding + }; + + assert!(frames_available >= 0); + + // Obtaining a pointer to the buffer. + let (buffer_data, buffer_len) = { + let mut buffer: *mut winapi::BYTE = mem::uninitialized(); + let hresult = (*inner.render_client).GetBuffer(frames_available, + &mut buffer as *mut *mut _); + check_result(hresult).unwrap(); // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` + debug_assert!(!buffer.is_null()); + + (buffer as *mut _, + frames_available as usize * self.bytes_per_frame as usize / mem::size_of::()) // FIXME: correct size + }; + + let buffer = Buffer { + voice: self.inner.clone(), + buffer_data: buffer_data, + buffer_len: buffer_len, + frames: frames_available, + }; + + Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) // FIXME: not necessarily F32 } } + + fn schedule(&mut self, task: &mut Task) { + let mut pending = self.event_loop.pending_scheduled.lock().unwrap(); + pending.handles.push(self.event); + pending.task_handles.push((task.handle().clone(), self.ready.clone())); + drop(pending); + + let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) }; + assert!(result != 0); + } } -impl Drop for Voice { +impl Drop for VoiceInner { #[inline] fn drop(&mut self) { unsafe { @@ -252,48 +402,40 @@ impl Drop for Voice { } } -pub enum Buffer<'a, T: 'a> { - Empty, - Buffer { - render_client: *mut winapi::IAudioRenderClient, - buffer_data: *mut T, - buffer_len: usize, - frames: winapi::UINT32, - marker: PhantomData<&'a mut T>, - }, +pub struct Buffer { + voice: Arc>, + + buffer_data: *mut T, + buffer_len: usize, + frames: winapi::UINT32, } -impl<'a, T> Buffer<'a, T> { +unsafe impl Send for Buffer {} + +impl Buffer { #[inline] - pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] { - match self { - &mut Buffer::Empty => &mut [], - &mut Buffer::Buffer { buffer_data, buffer_len, .. } => unsafe { - slice::from_raw_parts_mut(buffer_data, buffer_len) - }, + pub fn get_buffer(&mut self) -> &mut [T] { + unsafe { + slice::from_raw_parts_mut(self.buffer_data, self.buffer_len) } } #[inline] pub fn len(&self) -> usize { - match self { - &Buffer::Empty => 0, - &Buffer::Buffer { buffer_len, .. } => buffer_len, - } + self.buffer_len } #[inline] pub fn finish(self) { - if let Buffer::Buffer { render_client, frames, .. } = self { - unsafe { - let hresult = (*render_client).ReleaseBuffer(frames as u32, 0); - match check_result(hresult) { - // ignoring the error that is produced if the device has been disconnected - Err(ref e) - if e.raw_os_error() == Some(winapi::AUDCLNT_E_DEVICE_INVALIDATED) => (), - e => e.unwrap(), - }; - } + unsafe { + let mut inner = self.voice.lock().unwrap(); + let hresult = (*inner.render_client).ReleaseBuffer(self.frames as u32, 0); + match check_result(hresult) { + // ignoring the error that is produced if the device has been disconnected + Err(ref e) + if e.raw_os_error() == Some(winapi::AUDCLNT_E_DEVICE_INVALIDATED) => (), + e => e.unwrap(), + }; } } }