From be8310da5122bae7bb969c2e1e6b7e046fcfdf8b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 2 Aug 2016 16:13:59 +0200 Subject: [PATCH 1/9] Draft for switching to futures --- Cargo.toml | 2 + examples/beep.rs | 27 +++- src/lib.rs | 122 +++++++-------- src/wasapi/mod.rs | 3 +- src/wasapi/voice.rs | 352 +++++++++++++++++++++++++++++++------------- 5 files changed, 322 insertions(+), 184 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8f08cb9..027e03b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,12 @@ license = "Apache-2.0" keywords = ["audio", "sound"] [dependencies] +futures = { git = "https://github.com/alexcrichton/futures-rs" } libc = "0.2" lazy_static = "0.2" winapi = "0.2.8" ole32-sys = "0.2" +kernel32-sys = "0.2" [target.arm-unknown-linux-gnueabihf.dependencies.alsa-sys] version = "0" diff --git a/examples/beep.rs b/examples/beep.rs index 3c9272c..a3b61be 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -1,16 +1,25 @@ extern crate cpal; +extern crate futures; + +use futures::Future; +use futures::stream::Stream; fn main() { let endpoint = cpal::get_default_endpoint().expect("Failed to get default endpoint"); let format = endpoint.get_supported_formats_list().unwrap().next().expect("Failed to get endpoint format"); - let mut channel = cpal::Voice::new(&endpoint, &format).expect("Failed to create a channel"); + + let event_loop = cpal::EventLoop::new(); + + let (mut voice, stream) = cpal::Voice::new(&endpoint, &format, &event_loop).expect("Failed to create a voice"); // Produce a sinusoid of maximum amplitude. - let mut data_source = (0u64..).map(|t| t as f32 * 440.0 * 2.0 * 3.141592 / format.samples_rate.0 as f32) // 440 Hz - .map(|t| t.sin()); + let samples_rate = format.samples_rate.0 as f32; + let mut data_source = (0u64..).map(move |t| t as f32 * 440.0 * 2.0 * 3.141592 / samples_rate) // 440 Hz + .map(move |t| t.sin()); - loop { - match channel.append_data(32768) { + voice.play(); + stream.for_each(move |buffer| -> Result<_, ()> { + match buffer { cpal::UnknownTypeBuffer::U16(mut buffer) => { for (sample, value) in buffer.chunks_mut(format.channels.len()).zip(&mut data_source) { let value = ((value * 0.5 + 0.5) * std::u16::MAX as f32) as u16; @@ -30,8 +39,10 @@ fn main() { for out in sample.iter_mut() { *out = value; } } }, - } + }; - channel.play(); - } + Ok(()) + }).forget(); + + event_loop.run(); } diff --git a/src/lib.rs b/src/lib.rs index 6909038..f92e4e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ reaches the end of the data, it will stop playing. You must continuously fill th calling `append_data` repeatedly if you don't want the audio to stop playing. */ + +extern crate futures; #[macro_use] extern crate lazy_static; extern crate libc; @@ -50,6 +52,10 @@ use std::fmt; use std::error::Error; use std::ops::{Deref, DerefMut}; +use futures::stream::Stream; +use futures::Poll; +use futures::Task; + mod null; mod samples_formats; @@ -169,29 +175,43 @@ impl Iterator for SupportedFormatsIterator { } } +pub struct EventLoop(cpal_impl::EventLoop); + +impl EventLoop { + #[inline] + pub fn new() -> EventLoop { + EventLoop(cpal_impl::EventLoop::new()) + } + + #[inline] + pub fn run(&self) { + self.0.run() + } +} + /// Represents a buffer that must be filled with audio data. /// /// You should destroy this object as soon as possible. Data is only committed when it /// is destroyed. #[must_use] -pub struct Buffer<'a, T: 'a> where T: Sample { +pub struct Buffer where T: Sample { // also contains something, taken by `Drop` - target: Option>, + target: Option>, } /// This is the struct that is provided to you by cpal when you want to write samples to a buffer. /// /// Since the type of data is only known at runtime, you have to fill the right buffer. -pub enum UnknownTypeBuffer<'a> { +pub enum UnknownTypeBuffer { /// Samples whose format is `u16`. - U16(Buffer<'a, u16>), + U16(Buffer), /// Samples whose format is `i16`. - I16(Buffer<'a, i16>), + I16(Buffer), /// Samples whose format is `f32`. - F32(Buffer<'a, f32>), + F32(Buffer), } -impl<'a> UnknownTypeBuffer<'a> { +impl UnknownTypeBuffer { /// Returns the length of the buffer in number of samples. #[inline] pub fn len(&self) -> usize { @@ -282,13 +302,19 @@ pub struct Voice { impl Voice { /// Builds a new channel. #[inline] - pub fn new(endpoint: &Endpoint, format: &Format) -> Result { - let channel = try!(cpal_impl::Voice::new(&endpoint.0, format)); + pub fn new(endpoint: &Endpoint, format: &Format, event_loop: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { + let (voice, stream) = try!(cpal_impl::Voice::new(&endpoint.0, format, &event_loop.0)); - Ok(Voice { - voice: channel, + let voice = Voice { + voice: voice, format: format.clone(), - }) + }; + + let stream = SamplesStream(stream); + + Ok((voice, stream)) } /// Returns the format used by the voice. @@ -324,51 +350,6 @@ impl Voice { self.format().data_type } - /// Returns the minimum number of samples that should be put in a buffer before it is - /// processable by the audio output. - /// - /// If you put less than this value in the buffer, the buffer will not be processed and you - /// risk an underrun. - #[inline] - pub fn get_period(&self) -> usize { - self.voice.get_period() - } - - /// Adds some PCM data to the voice's buffer. - /// - /// This function indirectly returns a `Buffer` object that must be filled with the audio data. - /// The size of the buffer being returned depends on the current state of the backend - /// and can't be known in advance. However it is never greater than `max_samples`. - /// - /// You must fill the buffer *entirely*, so do not set `max_samples` to a value greater - /// than the amount of data available to you. - /// - /// Channels are interleaved. For example if you have two channels, you must write - /// the first sample of the first channel, then the first sample of the second channel, - /// then the second sample of the first channel, then the second sample of the second - /// channel, etc. - /// - /// ## Panic - /// - /// Panics if `max_samples` is 0. - /// - #[inline] - pub fn append_data(&mut self, max_samples: usize) -> UnknownTypeBuffer { - assert!(max_samples != 0); - - match self.get_samples_format() { - SampleFormat::U16 => UnknownTypeBuffer::U16(Buffer { - target: Some(self.voice.append_data(max_samples)) - }), - SampleFormat::I16 => UnknownTypeBuffer::I16(Buffer { - target: Some(self.voice.append_data(max_samples)) - }), - SampleFormat::F32 => UnknownTypeBuffer::F32(Buffer { - target: Some(self.voice.append_data(max_samples)) - }), - } - } - /// Sends a command to the audio device that it should start playing. /// /// Has no effect is the voice was already playing. @@ -389,25 +370,26 @@ impl Voice { pub fn pause(&mut self) { self.voice.pause() } +} + +pub struct SamplesStream(cpal_impl::SamplesStream); + +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); - /// Returns the number of samples in the buffer that are currently being processed by the - /// audio playback backend. - /// - /// This function is useful to determine how much time it will take to finish playing the - /// current sound. #[inline] - pub fn get_pending_samples(&self) -> usize { - self.voice.get_pending_samples() + fn poll(&mut self, task: &mut Task) -> Poll, Self::Error> { + self.0.poll(task) } - /// Returns true if the voice has finished reading all the data you sent to it. #[inline] - pub fn underflowed(&self) -> bool { - self.voice.underflowed() + fn schedule(&mut self, task: &mut Task) { + self.0.schedule(task) } } -impl<'a, T> Deref for Buffer<'a, T> where T: Sample { +impl Deref for Buffer where T: Sample { type Target = [T]; #[inline] @@ -416,14 +398,14 @@ impl<'a, T> Deref for Buffer<'a, T> where T: Sample { } } -impl<'a, T> DerefMut for Buffer<'a, T> where T: Sample { +impl DerefMut for Buffer where T: Sample { #[inline] fn deref_mut(&mut self) -> &mut [T] { self.target.as_mut().unwrap().get_buffer() } } -impl<'a, T> Drop for Buffer<'a, T> where T: Sample { +impl Drop for Buffer where T: Sample { #[inline] fn drop(&mut self) { self.target.take().unwrap().finish(); diff --git a/src/wasapi/mod.rs b/src/wasapi/mod.rs index 5d6267c..a951b5c 100644 --- a/src/wasapi/mod.rs +++ b/src/wasapi/mod.rs @@ -1,5 +1,6 @@ extern crate winapi; extern crate ole32; +extern crate kernel32; use std::io::Error as IoError; use std::os::windows::ffi::OsStringExt; @@ -17,7 +18,7 @@ use SampleFormat; pub use std::option::IntoIter as OptionIntoIter; pub use self::enumerate::{EndpointsIterator, get_default_endpoint}; -pub use self::voice::{Voice, Buffer}; +pub use self::voice::{Voice, Buffer, EventLoop, SamplesStream}; pub type SupportedFormatsIterator = OptionIntoIter; diff --git a/src/wasapi/voice.rs b/src/wasapi/voice.rs index 1cf0fdf..3d49469 100644 --- a/src/wasapi/voice.rs +++ b/src/wasapi/voice.rs @@ -1,4 +1,5 @@ use super::com; +use super::kernel32; use super::ole32; use super::winapi; use super::Endpoint; @@ -9,25 +10,141 @@ use std::slice; use std::mem; use std::ptr; use std::marker::PhantomData; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Mutex; + +use futures::Poll; +use futures::Task; +use futures::TaskHandle; +use futures::stream::Stream; use CreationError; use ChannelPosition; use Format; use SampleFormat; +use UnknownTypeBuffer; + +pub struct EventLoop { + inner: Arc, +} + +unsafe impl Send for EventLoop {} +unsafe impl Sync for EventLoop {} + +struct EventLoopInner { + // This event is signalled after elements have been added to `pending_scheduled`. + pending_scheduled_event: winapi::HANDLE, + scheduled: Mutex, + pending_scheduled: Mutex, +} + +struct EventLoopScheduled { + // List of handles that correspond to voices. + // They are linked to `task_handles`, but we store them separately in order to easily call + // `WaitForMultipleObjectsEx` on the array without having to perform any conversion. + handles: Vec, + + // List of task handles corresponding to `handles`. + task_handles: Vec<(TaskHandle, Arc)>, +} + +impl EventLoop { + pub fn new() -> EventLoop { + let pending_scheduled_event = unsafe { + kernel32::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) + }; + + EventLoop { + inner: Arc::new(EventLoopInner { + pending_scheduled_event: pending_scheduled_event, + scheduled: Mutex::new(EventLoopScheduled { + handles: vec![pending_scheduled_event], + task_handles: vec![], + }), + pending_scheduled: Mutex::new(EventLoopScheduled { + handles: vec![], + task_handles: vec![], + }) + }) + } + } + + pub fn run(&self) { + unsafe { + let mut scheduled = self.inner.scheduled.lock().unwrap(); + + loop { + // Creating a voice checks for the MAXIMUM_WAIT_OBJECTS limit. + debug_assert!(scheduled.handles.len() <= winapi::MAXIMUM_WAIT_OBJECTS as usize); + + // Wait for any of the handles to be signalled, which means that the corresponding + // sound needs a buffer. + let result = kernel32::WaitForMultipleObjectsEx(scheduled.handles.len() as u32, + scheduled.handles.as_ptr(), + winapi::FALSE, winapi::INFINITE, /* TODO: allow setting a timeout */ + winapi::FALSE /* irrelevant parameter here */); + + // Notifying the corresponding task handler. + assert!(result >= winapi::WAIT_OBJECT_0); + let handle_id = (result - winapi::WAIT_OBJECT_0) as usize; + + if handle_id == 0 { + let mut pending = self.inner.pending_scheduled.lock().unwrap(); + scheduled.handles.append(&mut pending.handles); + scheduled.task_handles.append(&mut pending.task_handles); + + } else { + scheduled.handles.remove(handle_id); + let (task_handle, ready) = scheduled.task_handles.remove(handle_id - 1); + ready.store(true, Ordering::Relaxed); + task_handle.notify(); + } + } + } + } +} + +impl Drop for EventLoop { + #[inline] + fn drop(&mut self) { + unsafe { + kernel32::CloseHandle(self.inner.pending_scheduled_event); + } + } +} pub struct Voice { - audio_client: *mut winapi::IAudioClient, - render_client: *mut winapi::IAudioRenderClient, + inner: Arc>, + playing: bool, +} + +pub struct SamplesStream { + event_loop: Arc, + inner: Arc>, + // The event that is signalled whenever a buffer is ready to be submitted to the voice. + event: winapi::HANDLE, // TODO: not deleted max_frames_in_buffer: winapi::UINT32, bytes_per_frame: winapi::WORD, - playing: bool, + ready: Arc, +} + +unsafe impl Send for SamplesStream {} +unsafe impl Sync for SamplesStream {} + +struct VoiceInner { + audio_client: *mut winapi::IAudioClient, + render_client: *mut winapi::IAudioRenderClient, } unsafe impl Send for Voice {} unsafe impl Sync for Voice {} impl Voice { - pub fn new(end_point: &Endpoint, format: &Format) -> Result { + pub fn new(end_point: &Endpoint, format: &Format, event_loop: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { unsafe { // making sure that COM is initialized // it's not actually sure that this is required, but when in doubt do it @@ -76,8 +193,9 @@ impl Voice { }; // finally initializing the audio client - let hresult = (*audio_client).Initialize(share_mode, 0, 10000000, 0, - &format_attempt.Format, ptr::null()); + let hresult = (*audio_client).Initialize(share_mode, + winapi::AUDCLNT_STREAMFLAGS_EVENTCALLBACK, + 0, 0, &format_attempt.Format, ptr::null()); match check_result(hresult) { Err(ref e) if e.raw_os_error() == Some(winapi::AUDCLNT_E_DEVICE_INVALIDATED) => { @@ -94,6 +212,25 @@ impl Voice { format_attempt.Format }; + // Creating the event that will be signalled whenever we need to submit some samples. + let event = { + let event = kernel32::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()); + if event == ptr::null_mut() { + (*audio_client).Release(); + panic!("Failed to create event"); + } + + match check_result((*audio_client).SetEventHandle(event)) { + Err(e) => { + (*audio_client).Release(); + panic!("Failed to call SetEventHandle") + }, + Ok(_) => () + }; + + event + }; + // obtaining the size of the samples buffer in number of frames let max_frames_in_buffer = { let mut max_frames_in_buffer = mem::uninitialized(); @@ -115,7 +252,7 @@ impl Voice { max_frames_in_buffer }; - // building a `IAudioRenderClient` that will be used to fill the samples buffer + // Building a `IAudioRenderClient` that will be used to fill the samples buffer. let render_client = { let mut render_client: *mut winapi::IAudioRenderClient = mem::uninitialized(); let hresult = (*audio_client).GetService(&winapi::IID_IAudioRenderClient, @@ -139,79 +276,37 @@ impl Voice { &mut *render_client }; - // everything went fine - Ok(Voice { + // Everything went fine. + let inner = Arc::new(Mutex::new(VoiceInner { audio_client: audio_client, render_client: render_client, + })); + + let voice = Voice { + inner: inner.clone(), + playing: false, + }; + + let samples_stream = SamplesStream { + event_loop: event_loop.inner.clone(), + inner: inner, + event: event, max_frames_in_buffer: max_frames_in_buffer, bytes_per_frame: format.nBlockAlign, - playing: false, - }) - } - } - - pub fn append_data<'a, T>(&'a mut self, max_elements: usize) -> Buffer<'a, T> { - unsafe { - // obtaining the number of frames that are available to be written - let frames_available = { - let mut padding = mem::uninitialized(); - let hresult = (*self.audio_client).GetCurrentPadding(&mut padding); - check_result(hresult).unwrap(); - self.max_frames_in_buffer - padding + ready: Arc::new(AtomicBool::new(false)), }; - // making sure `frames_available` is inferior to `max_elements` - let frames_available = cmp::min(frames_available, - max_elements as u32 * mem::size_of::() as u32 / - self.bytes_per_frame as u32); - - // the WASAPI has some weird behaviors when the buffer size is zero, so we handle this - // ourselves - if frames_available == 0 { - return Buffer::Empty; - } - - // obtaining a pointer to the buffer - let (buffer_data, buffer_len) = { - let mut buffer: *mut winapi::BYTE = mem::uninitialized(); - let hresult = (*self.render_client).GetBuffer(frames_available, - &mut buffer as *mut *mut _); - check_result(hresult).unwrap(); // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` - debug_assert!(!buffer.is_null()); - - (buffer as *mut T, - frames_available as usize * self.bytes_per_frame as usize / mem::size_of::()) - }; - - Buffer::Buffer { - render_client: self.render_client, - buffer_data: buffer_data, - buffer_len: buffer_len, - frames: frames_available, - marker: PhantomData, - } - } - } - - #[inline] - pub fn get_period(&self) -> usize { - 0 - } - - pub fn get_pending_samples(&self) -> usize { - unsafe { - let mut padding = mem::uninitialized(); - let hresult = (*self.audio_client).GetCurrentPadding(&mut padding); - check_result(hresult).unwrap(); - padding as usize + Ok((voice, samples_stream)) } } #[inline] pub fn play(&mut self) { if !self.playing { + let mut inner = self.inner.lock().unwrap(); + unsafe { - let hresult = (*self.audio_client).Start(); + let hresult = (*inner.audio_client).Start(); check_result(hresult).unwrap(); } } @@ -222,27 +317,82 @@ impl Voice { #[inline] pub fn pause(&mut self) { if self.playing { + let mut inner = self.inner.lock().unwrap(); + unsafe { - let hresult = (*self.audio_client).Stop(); + let hresult = (*inner.audio_client).Stop(); check_result(hresult).unwrap(); } } self.playing = false; } +} - pub fn underflowed(&self) -> bool { +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); + + fn poll(&mut self, task: &mut Task) -> Poll, Self::Error> { unsafe { - let mut padding = mem::uninitialized(); - let hresult = (*self.audio_client).GetCurrentPadding(&mut padding); - check_result(hresult).unwrap(); - - padding == 0 + if self.ready.swap(false, Ordering::Relaxed) == false { + let result = kernel32::WaitForSingleObject(self.event, 0); + + // Returning if timeout. + match result { + winapi::WAIT_OBJECT_0 => (), + winapi::WAIT_TIMEOUT => return Poll::NotReady, + _ => unreachable!() + }; + } + + let mut inner = self.inner.lock().unwrap(); + + // Obtaining the number of frames that are available to be written. + let frames_available = { + let mut padding = mem::uninitialized(); + let hresult = (*inner.audio_client).GetCurrentPadding(&mut padding); + check_result(hresult).unwrap(); + self.max_frames_in_buffer - padding + }; + + assert!(frames_available >= 0); + + // Obtaining a pointer to the buffer. + let (buffer_data, buffer_len) = { + let mut buffer: *mut winapi::BYTE = mem::uninitialized(); + let hresult = (*inner.render_client).GetBuffer(frames_available, + &mut buffer as *mut *mut _); + check_result(hresult).unwrap(); // FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED` + debug_assert!(!buffer.is_null()); + + (buffer as *mut _, + frames_available as usize * self.bytes_per_frame as usize / mem::size_of::()) // FIXME: correct size + }; + + let buffer = Buffer { + voice: self.inner.clone(), + buffer_data: buffer_data, + buffer_len: buffer_len, + frames: frames_available, + }; + + Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) // FIXME: not necessarily F32 } } + + fn schedule(&mut self, task: &mut Task) { + let mut pending = self.event_loop.pending_scheduled.lock().unwrap(); + pending.handles.push(self.event); + pending.task_handles.push((task.handle().clone(), self.ready.clone())); + drop(pending); + + let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) }; + assert!(result != 0); + } } -impl Drop for Voice { +impl Drop for VoiceInner { #[inline] fn drop(&mut self) { unsafe { @@ -252,48 +402,40 @@ impl Drop for Voice { } } -pub enum Buffer<'a, T: 'a> { - Empty, - Buffer { - render_client: *mut winapi::IAudioRenderClient, - buffer_data: *mut T, - buffer_len: usize, - frames: winapi::UINT32, - marker: PhantomData<&'a mut T>, - }, +pub struct Buffer { + voice: Arc>, + + buffer_data: *mut T, + buffer_len: usize, + frames: winapi::UINT32, } -impl<'a, T> Buffer<'a, T> { +unsafe impl Send for Buffer {} + +impl Buffer { #[inline] - pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] { - match self { - &mut Buffer::Empty => &mut [], - &mut Buffer::Buffer { buffer_data, buffer_len, .. } => unsafe { - slice::from_raw_parts_mut(buffer_data, buffer_len) - }, + pub fn get_buffer(&mut self) -> &mut [T] { + unsafe { + slice::from_raw_parts_mut(self.buffer_data, self.buffer_len) } } #[inline] pub fn len(&self) -> usize { - match self { - &Buffer::Empty => 0, - &Buffer::Buffer { buffer_len, .. } => buffer_len, - } + self.buffer_len } #[inline] pub fn finish(self) { - if let Buffer::Buffer { render_client, frames, .. } = self { - unsafe { - let hresult = (*render_client).ReleaseBuffer(frames as u32, 0); - match check_result(hresult) { - // ignoring the error that is produced if the device has been disconnected - Err(ref e) - if e.raw_os_error() == Some(winapi::AUDCLNT_E_DEVICE_INVALIDATED) => (), - e => e.unwrap(), - }; - } + unsafe { + let mut inner = self.voice.lock().unwrap(); + let hresult = (*inner.render_client).ReleaseBuffer(self.frames as u32, 0); + match check_result(hresult) { + // ignoring the error that is produced if the device has been disconnected + Err(ref e) + if e.raw_os_error() == Some(winapi::AUDCLNT_E_DEVICE_INVALIDATED) => (), + e => e.unwrap(), + }; } } } From 7c587853ad87f20cb3d8a3411f27500eba273183 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 2 Aug 2016 22:28:37 +0200 Subject: [PATCH 2/9] Make it work on Linux --- src/alsa/mod.rs | 373 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 300 insertions(+), 73 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index fb8ab17..f6b8cb9 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -9,10 +9,16 @@ use Format; use FormatsEnumerationError; use SampleFormat; use SamplesRate; +use UnknownTypeBuffer; use std::{ffi, cmp, iter, mem, ptr}; use std::vec::IntoIter as VecIntoIter; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; + +use futures::Poll; +use futures::Task; +use futures::TaskHandle; +use futures::stream::Stream; pub type SupportedFormatsIterator = VecIntoIter; @@ -174,18 +180,273 @@ impl Endpoint { } } -pub struct Voice { - channel: Mutex<*mut alsa::snd_pcm_t>, - num_channels: u16, - buffer_len: usize, // number of samples that can fit in the buffer - period_len: usize, // minimum number of samples to put in the buffer +pub struct EventLoop { + inner: Arc, } -pub struct Buffer<'a, T> { - channel: &'a mut Voice, +struct EventLoopInner { + // Descriptors that we are currently waiting upon. This member is always locked while `run()` + // is executed, ie. most of the time. + // + // Note that for `current_wait`, the first element of `descriptors` is always + // `pending_wait_signal`. Therefore the length of `descriptors` is always one more than + // `voices`. + current_wait: Mutex, + + // Since we can't add elements to `current_wait` (as it's locked), we add them to + // `pending_wait`. Once that's done, we signal `pending_wait_signal` so that the `run()` + // function can pause and add the content of `pending_wait` to `current_wait`. + pending_wait: Mutex, + + // A file descriptor opened with `eventfd`. Always the first element + // of `current_wait.descriptors`. Should be notified when an element is added + // to `pending_wait` so that the current wait can stop and take the pending wait into + // account. + pending_wait_signal: libc::c_int, +} + +struct PollDescriptors { + // Descriptors to wait for. + descriptors: Vec, + // List of voices that are written in `descriptors`. + voices: Vec>, +} + +unsafe impl Send for EventLoopInner {} +unsafe impl Sync for EventLoopInner {} + +impl Drop for EventLoopInner { + fn drop(&mut self) { + unsafe { + libc::close(self.pending_wait_signal); + } + } +} + +impl EventLoop { + #[inline] + pub fn new() -> EventLoop { + let pending_wait_signal = unsafe { libc::eventfd(0, 0) }; + + EventLoop { + inner: Arc::new(EventLoopInner { + current_wait: Mutex::new(PollDescriptors { + descriptors: vec![libc::pollfd { + fd: pending_wait_signal, + events: libc::POLLIN, + revents: 0, + }], + voices: Vec::new(), + }), + pending_wait: Mutex::new(PollDescriptors { + descriptors: Vec::new(), + voices: Vec::new(), + }), + pending_wait_signal: pending_wait_signal, + }) + } + } + + #[inline] + pub fn run(&self) { + unsafe { + let mut current_wait = self.inner.current_wait.lock().unwrap(); + + loop { + let ret = libc::poll(current_wait.descriptors.as_mut_ptr(), + current_wait.descriptors.len() as libc::nfds_t, + -1 /* infinite */); + assert!(ret >= 0, "poll() failed"); + + if ret == 0 { + continue; + } + + // If the `pending_wait_signal` was signaled, add the pending waits to + // the current waits. + if current_wait.descriptors[0].revents != 0 { + current_wait.descriptors[0].revents = 0; + + let mut pending = self.inner.pending_wait.lock().unwrap(); + current_wait.descriptors.append(&mut pending.descriptors); + current_wait.voices.append(&mut pending.voices); + + // Emptying the signal. + let mut out = 0u64; + let ret = libc::read(self.inner.pending_wait_signal, + &mut out as *mut u64 as *mut _, 8); + assert_eq!(ret, 8); + } + + // Check each individual descriptor for events. + let mut i_voice = 0; + let mut i_descriptor = 1; + while i_voice < current_wait.voices.len() { + let mut revent = mem::uninitialized(); + + { + let channel = *current_wait.voices[i_voice].channel.lock().unwrap(); + let num_descriptors = current_wait.voices[i_voice].num_descriptors as libc::c_uint; + check_errors(alsa::snd_pcm_poll_descriptors_revents(channel, current_wait.descriptors + .as_mut_ptr().offset(i_descriptor), + num_descriptors, &mut revent)).unwrap(); + } + + if (revent as libc::c_short & libc::POLLOUT) != 0 { + let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); + if let Some(scheduled) = scheduled { + scheduled.notify(); + } + + for _ in 0 .. current_wait.voices[i_voice].num_descriptors { + current_wait.descriptors.remove(i_descriptor as usize); + } + current_wait.voices.remove(i_voice); + + } else { + i_descriptor += current_wait.voices[i_voice].num_descriptors as isize; + i_voice += 1; + } + } + } + } + } +} + +pub struct Voice; + +pub struct Buffer { + inner: Arc, buffer: Vec, } +pub struct SamplesStream { + inner: Arc, +} + +struct VoiceInner { + // The event loop used to create the voice. + event_loop: Arc, + + // The ALSA channel. + channel: Mutex<*mut alsa::snd_pcm_t>, + + // When converting between file descriptors and `snd_pcm_t`, this is the number of + // file descriptors that this `snd_pcm_t` uses. + num_descriptors: usize, + + // Format of the samples. + sample_format: SampleFormat, + + // Number of channels, ie. number of samples per frame. + num_channels: u16, + + // Number of samples that can fit in the buffer. + buffer_len: usize, + + // Minimum number of samples to put in the buffer. + period_len: usize, + + // If `Some`, something previously called `schedule` on the stream. + scheduled: Mutex>, +} + +unsafe impl Send for VoiceInner {} +unsafe impl Sync for VoiceInner {} + +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); + + fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { + // Determine the number of samples that are available to write. + let available = { + let channel = self.inner.channel.lock().expect("could not lock channel"); + let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update? + + if available == -32 { + // buffer underrun + self.inner.buffer_len + } else if available < 0 { + check_errors(available as libc::c_int).expect("buffer is not available"); + unreachable!() + } else { + (available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize + } + }; + + // If we don't have one period ready, return `NotReady`. + if available < self.inner.period_len { + return Poll::NotReady; + } + + // Add an upper bound to the available space. + let available = if available > 8192 { 8192 } else { available }; + + // We now sure that we're ready to write data. + match self.inner.sample_format { + SampleFormat::I16 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Poll::Ok(Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) }))) + }, + SampleFormat::U16 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Poll::Ok(Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) }))) + }, + SampleFormat::F32 => { + let buffer = Buffer { + buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(), + inner: self.inner.clone(), + }; + + Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) + }, + } + } + + #[inline] + fn schedule(&mut self, task: &mut Task) { + unsafe { + let channel = self.inner.channel.lock().unwrap(); + + // We start by filling `scheduled`. + *self.inner.scheduled.lock().unwrap() = Some(task.handle().clone()); + + // In this function we turn the `snd_pcm_t` into a collection of file descriptors. + // And we add these descriptors to `event_loop.pending_wait.descriptors`. + let mut pending_wait = self.inner.event_loop.pending_wait.lock().unwrap(); + pending_wait.descriptors.reserve(self.inner.num_descriptors); + + let len = pending_wait.descriptors.len(); + let filled = alsa::snd_pcm_poll_descriptors(*channel, + pending_wait.descriptors.as_mut_ptr() + .offset(len as isize), + self.inner.num_descriptors as libc::c_uint); + debug_assert_eq!(filled, self.inner.num_descriptors as libc::c_int); + pending_wait.descriptors.set_len(len + self.inner.num_descriptors); + + // We also fill `voices`. + pending_wait.voices.push(self.inner.clone()); + + // Now that `pending_wait` received additional descriptors, we signal the event + // so that our event loops can pick it up. + drop(pending_wait); + let buf = 1u64; + let wret = libc::write(self.inner.event_loop.pending_wait_signal, + &buf as *const u64 as *const _, 8); + assert!(wret == 8); + } + } +} + /// Wrapper around `hw_params`. struct HwParams(*mut alsa::snd_pcm_hw_params_t); @@ -208,13 +469,15 @@ impl Drop for HwParams { } impl Voice { - pub fn new(endpoint: &Endpoint, format: &Format) -> Result { + pub fn new(endpoint: &Endpoint, format: &Format, event_loop: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { unsafe { let name = ffi::CString::new(endpoint.0.clone()).expect("unable to clone endpoint"); let mut playback_handle = mem::uninitialized(); match alsa::snd_pcm_open(&mut playback_handle, name.as_ptr(), - alsa::SND_PCM_STREAM_PLAYBACK, alsa::SND_PCM_NONBLOCK) + alsa::SND_PCM_STREAM_PLAYBACK, 0) { -16 /* determined empirically */ => return Err(CreationError::DeviceNotAvailable), e => check_errors(e).expect("Device unavailable") @@ -235,6 +498,13 @@ impl Voice { check_errors(alsa::snd_pcm_hw_params_set_channels(playback_handle, hw_params.0, format.channels.len() as libc::c_uint)).expect("channel count could not be set"); check_errors(alsa::snd_pcm_hw_params(playback_handle, hw_params.0)).expect("hardware params could not be set"); + let mut sw_params = mem::uninitialized(); // TODO: RAII + check_errors(alsa::snd_pcm_sw_params_malloc(&mut sw_params)).unwrap(); + check_errors(alsa::snd_pcm_sw_params_current(playback_handle, sw_params)).unwrap(); + check_errors(alsa::snd_pcm_sw_params_set_avail_min(playback_handle, sw_params, 4096)).unwrap(); + check_errors(alsa::snd_pcm_sw_params_set_start_threshold(playback_handle, sw_params, 0)).unwrap(); + check_errors(alsa::snd_pcm_sw_params(playback_handle, sw_params)).unwrap(); + check_errors(alsa::snd_pcm_prepare(playback_handle)).expect("could not get playback handle"); let (buffer_len, period_len) = { @@ -247,36 +517,26 @@ impl Voice { (buffer, period) }; - Ok(Voice { + let num_descriptors = { + let num_descriptors = alsa::snd_pcm_poll_descriptors_count(playback_handle); + debug_assert!(num_descriptors >= 1); + num_descriptors as usize + }; + + let samples_stream_inner = Arc::new(VoiceInner { + event_loop: event_loop.inner.clone(), channel: Mutex::new(playback_handle), + sample_format: format.data_type, + num_descriptors: num_descriptors, num_channels: format.channels.len() as u16, buffer_len: buffer_len, period_len: period_len, - }) - } - } + scheduled: Mutex::new(None), + }); - pub fn append_data<'a, T>(&'a mut self, max_elements: usize) -> Buffer<'a, T> where T: Clone { - let available = { - let channel = self.channel.lock().expect("could not lock channel"); - let available = unsafe { alsa::snd_pcm_avail(*channel) }; - - if available == -32 { - // buffer underrun - self.buffer_len - } else if available < 0 { - check_errors(available as libc::c_int).expect("buffer is not available"); - unreachable!() - } else { - (available * self.num_channels as alsa::snd_pcm_sframes_t) as usize - } - }; - - let elements = cmp::min(available, max_elements); - - Buffer { - channel: self, - buffer: iter::repeat(unsafe { mem::uninitialized() }).take(elements).collect(), + Ok((Voice, SamplesStream { + inner: samples_stream_inner + })) } } @@ -290,42 +550,9 @@ impl Voice { pub fn pause(&mut self) { unimplemented!() } - - #[inline] - pub fn get_period(&self) -> usize { - self.period_len - } - - pub fn get_pending_samples(&self) -> usize { - let available = { - let channel = self.channel.lock().expect("could not lock channel"); - let available = unsafe { alsa::snd_pcm_avail(*channel) }; - - if available == -32 { - self.buffer_len as alsa::snd_pcm_sframes_t // buffer underrun - } else if available < 0 { - check_errors(available as libc::c_int).expect("could not write to buffer"); - unreachable!() - } else { - available * self.num_channels as alsa::snd_pcm_sframes_t - } - }; - - self.buffer_len - available as usize - } - - pub fn underflowed(&self) -> bool { - let channel = self.channel.lock().expect("channel underflow"); - - let available = unsafe { alsa::snd_pcm_avail(*channel) }; - available == -32 - } } -unsafe impl Send for Voice {} -unsafe impl Sync for Voice {} - -impl Drop for Voice { +impl Drop for VoiceInner { #[inline] fn drop(&mut self) { unsafe { @@ -334,9 +561,9 @@ impl Drop for Voice { } } -impl<'a, T> Buffer<'a, T> { +impl Buffer { #[inline] - pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] { + pub fn get_buffer(&mut self) -> &mut [T] { &mut self.buffer } @@ -346,9 +573,9 @@ impl<'a, T> Buffer<'a, T> { } pub fn finish(self) { - let to_write = (self.buffer.len() / self.channel.num_channels as usize) + let to_write = (self.buffer.len() / self.inner.num_channels as usize) as alsa::snd_pcm_uframes_t; - let channel = self.channel.channel.lock().expect("Buffer channel lock failed"); + let channel = self.inner.channel.lock().expect("Buffer channel lock failed"); unsafe { loop { From 0c915cac8f63ee4bba554e73f32a6bafa6ac4654 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Aug 2016 10:31:02 +0200 Subject: [PATCH 3/9] Use a max buffer size in order to avoid problems --- src/alsa/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index f6b8cb9..350e64e 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -380,9 +380,6 @@ impl Stream for SamplesStream { return Poll::NotReady; } - // Add an upper bound to the available space. - let available = if available > 8192 { 8192 } else { available }; - // We now sure that we're ready to write data. match self.inner.sample_format { SampleFormat::I16 => { @@ -496,6 +493,8 @@ impl Voice { check_errors(alsa::snd_pcm_hw_params_set_format(playback_handle, hw_params.0, data_type)).expect("format could not be set"); check_errors(alsa::snd_pcm_hw_params_set_rate(playback_handle, hw_params.0, format.samples_rate.0 as libc::c_uint, 0)).expect("sample rate could not be set"); check_errors(alsa::snd_pcm_hw_params_set_channels(playback_handle, hw_params.0, format.channels.len() as libc::c_uint)).expect("channel count could not be set"); + let mut max_buffer_size = format.samples_rate.0 as alsa::snd_pcm_uframes_t / format.channels.len() as alsa::snd_pcm_uframes_t / 5; // 200ms of buffer + check_errors(alsa::snd_pcm_hw_params_set_buffer_size_max(playback_handle, hw_params.0, &mut max_buffer_size)).unwrap(); check_errors(alsa::snd_pcm_hw_params(playback_handle, hw_params.0)).expect("hardware params could not be set"); let mut sw_params = mem::uninitialized(); // TODO: RAII From c2f89d8b2dfe3fb8d2526d67f576836b89e88105 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Aug 2016 12:55:47 +0200 Subject: [PATCH 4/9] Update documentation --- src/lib.rs | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f92e4e9..dca352e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ /*! # How to use cpal -In order to play a sound, first you need to create a `Voice`. +In order to play a sound, first you need to create an `EventLoop` and a `Voice`. ```no_run // getting the default sound output of the system (can return `None` if nothing is supported) @@ -13,28 +13,46 @@ let endpoint = cpal::get_default_endpoint().unwrap(); // getting a format for the PCM let format = endpoint.get_supported_formats_list().unwrap().next().unwrap(); -let mut voice = cpal::Voice::new(&endpoint, &format).unwrap(); +let event_loop = cpal::EventLoop::new(); + +let (voice, mut samples_stream) = cpal::Voice::new(&endpoint, &format, &event_loop).unwrap(); ``` -Then you must send raw samples to it by calling `append_data`. You must take the number of channels -and samples rate into account when writing the data. +The `voice` can be used to control the play/pause of the output, while the `samples_stream` can +be used to register a callback that will be called whenever the backend is ready to get data. +See the documentation of `futures-rs` for more info about how to use streams. + +```ignore // TODO: unignore +# let mut samples_stream: cpal::SamplesStream = unsafe { std::mem::uninitialized() }; +use futures::stream::Stream; + +samples_stream.for_each(move |buffer| -> Result<_, ()> { + // write data to `buffer` here + + Ok(()) +}).forget(); +``` TODO: add example -**Important**: the `append_data` function can return a buffer shorter than what you requested. -This is the case if the device doesn't have enough space available. **It happens very often**, -this is not some obscure situation that can be ignored. - -After you have submitted data for the first time, call `play`: +After you have registered a callback, call `play`: ```no_run # let mut voice: cpal::Voice = unsafe { std::mem::uninitialized() }; voice.play(); ``` -The audio device of the user will read the buffer that you sent, and play it. If the audio device -reaches the end of the data, it will stop playing. You must continuously fill the buffer by -calling `append_data` repeatedly if you don't want the audio to stop playing. +And finally, run the event loop: + +```no_run +# let mut event_loop: cpal::EventLoop = unsafe { std::mem::uninitialized() }; +event_loop.run(); +``` + +Calling `run()` will block the thread forever, so it's usually best done in a separate thread. + +While `run()` is running, the audio device of the user will call the callbacks you registered +from time to time. */ From 6060582aa0ddeaefd3149ee790ff79f2fd6c6cdd Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Aug 2016 13:05:51 +0200 Subject: [PATCH 5/9] Update the null implementation --- src/null/mod.rs | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/src/null/mod.rs b/src/null/mod.rs index c8f6d37..5e24ea8 100644 --- a/src/null/mod.rs +++ b/src/null/mod.rs @@ -2,9 +2,22 @@ use std::marker::PhantomData; +use futures::Poll; +use futures::Task; +use futures::stream::Stream; + use CreationError; use Format; use FormatsEnumerationError; +use UnknownTypeBuffer; + +pub struct EventLoop; +impl EventLoop { + #[inline] + pub fn new() -> EventLoop { EventLoop } + #[inline] + pub fn run(&self) { loop { /* TODO: don't spin */ } } +} #[derive(Default)] pub struct EndpointsIterator; @@ -52,18 +65,16 @@ impl Iterator for SupportedFormatsIterator { } pub struct Voice; +pub struct SamplesStream; impl Voice { #[inline] - pub fn new(_: &Endpoint, _: &Format) -> Result { + pub fn new(_: &Endpoint, _: &Format, _: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { Err(CreationError::DeviceNotAvailable) } - #[inline] - pub fn append_data<'a, T>(&'a mut self, _: usize) -> Buffer<'a, T> { - unreachable!() - } - #[inline] pub fn play(&mut self) { } @@ -71,30 +82,29 @@ impl Voice { #[inline] pub fn pause(&mut self) { } +} + +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); #[inline] - pub fn get_period(&self) -> usize { - 0 + fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { + Poll::NotReady } #[inline] - pub fn get_pending_samples(&self) -> usize { - unreachable!() - } - - #[inline] - pub fn underflowed(&self) -> bool { - false + fn schedule(&mut self, _: &mut Task) { } } -pub struct Buffer<'a, T: 'a> { - marker: PhantomData<&'a T>, +pub struct Buffer { + marker: PhantomData, } -impl<'a, T> Buffer<'a, T> { +impl Buffer { #[inline] - pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] { + pub fn get_buffer(&mut self) -> &mut [T] { unreachable!() } From cc26897acd527d30968907408fca1ae567c5ee23 Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Fri, 12 Aug 2016 17:49:13 +1000 Subject: [PATCH 6/9] Update coreaudio backend to new futures-rs oriented design. This depends on the changes introduced in #121. Update to coreaudio 0.6. --- Cargo.toml | 2 +- src/coreaudio/mod.rs | 298 +++++++++++++++++++------------------------ 2 files changed, 130 insertions(+), 170 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 027e03b..e81b4b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ version = "0" path = "alsa-sys" [target.x86_64-apple-darwin.dependencies] -coreaudio-rs = "~0.5.0" +coreaudio-rs = "0.6" [dev-dependencies] vorbis = "0" diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index eeaca15..9f1ffc0 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -1,20 +1,21 @@ extern crate coreaudio; extern crate libc; -use std::sync::mpsc::{channel, Sender, Receiver}; -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::cell::RefCell; -use std::mem; -use std::cmp; -use std::marker::PhantomData; - use CreationError; use Format; use FormatsEnumerationError; +use Sample; use SampleFormat; use SamplesRate; use ChannelPosition; +use UnknownTypeBuffer; + +use futures::{Poll, Task, TaskHandle}; +use futures::stream::Stream; +use std::sync::{Arc, Mutex}; + +use self::coreaudio::audio_unit::AudioUnit; +use self::coreaudio::audio_unit::render_callback::{self, data}; mod enumerate; @@ -22,9 +23,6 @@ pub use self::enumerate::{EndpointsIterator, SupportedFormatsIterator, get_default_endpoint}; -use self::coreaudio::audio_unit::{AudioUnit, IOType}; -use self::coreaudio::audio_unit::render_callback::{self, data}; - #[derive(Clone, PartialEq, Eq)] pub struct Endpoint; @@ -44,36 +42,41 @@ impl Endpoint { } } -pub struct Buffer<'a, T: 'a> { - samples_sender: Sender<(Vec, NumChannels)>, - samples: Vec, - num_channels: NumChannels, - marker: PhantomData<&'a T>, - pending_samples: Arc +pub struct EventLoop; +impl EventLoop { + #[inline] + pub fn new() -> EventLoop { EventLoop } + #[inline] + pub fn run(&self) { loop {} } } -impl<'a, T> Buffer<'a, T> { +pub struct Buffer { + args: render_callback::Args>, + buffer: Vec, +} + +impl Buffer where T: Sample { #[inline] - pub fn get_buffer<'b>(&'b mut self) -> &'b mut [T] { - &mut self.samples[..] + pub fn get_buffer(&mut self) -> &mut [T] { + &mut self.buffer[..] } #[inline] pub fn len(&self) -> usize { - self.samples.len() + self.buffer.len() } #[inline] pub fn finish(self) { - let Buffer { samples_sender, samples, num_channels, pending_samples, .. } = self; // TODO: At the moment this assumes the Vec is a Vec. // Need to add T: Sample and use Sample::to_vec_f32. - let num_samples = samples.len(); - let samples = unsafe { mem::transmute(samples) }; - pending_samples.fetch_add(num_samples, Ordering::SeqCst); - match samples_sender.send((samples, num_channels)) { - Err(_) => panic!("Failed to send samples to audio unit callback."), - Ok(()) => (), + let Buffer { mut args, buffer } = self; + + let num_channels = args.data.channels().count(); + for (i, frame) in buffer.chunks(num_channels).enumerate() { + for (channel, sample) in args.data.channels_mut().zip(frame.iter()) { + channel[i] = *sample; + } } } } @@ -81,91 +84,116 @@ impl<'a, T> Buffer<'a, T> { type NumChannels = usize; type NumFrames = usize; +pub struct Voice; + #[allow(dead_code)] // the audio_unit will be dropped if we don't hold it. -pub struct Voice { +pub struct SamplesStream { audio_unit: AudioUnit, - ready_receiver: Receiver<(NumChannels, NumFrames)>, - samples_sender: Sender<(Vec, NumChannels)>, - underflow: Arc>>, - last_ready: Arc>>>, - pending_samples: Arc + inner: Arc>, } -unsafe impl Sync for Voice {} -unsafe impl Send for Voice {} + +struct SamplesStreamInner { + scheduled_task: Option, + current_callback: Option>>, +} + +impl Stream for SamplesStream { + type Item = UnknownTypeBuffer; + type Error = (); + + fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { + let mut inner = self.inner.lock().unwrap(); + + // There are two possibilites: either we're answering a callback of coreaudio and we return + // a buffer, or we're not answering a callback and we return that we're not ready. + + let current_callback = match inner.current_callback.take() { + Some(c) => c, + None => return Poll::NotReady + }; + + let buffer_len = current_callback.num_frames * current_callback.data.channels().count(); + + let buffer = Buffer { + args: current_callback, + buffer: vec![0.0; buffer_len], + }; + + Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) + } + + fn schedule(&mut self, task: &mut Task) { + self.inner.lock().unwrap().scheduled_task = Some(task.handle().clone()); + } +} impl Voice { - pub fn new(_: &Endpoint, _: &Format) -> Result { - // A channel for signalling that the audio unit is ready for data. - let (ready_sender, ready_receiver) = channel(); - // A channel for sending the audio callback a pointer to the sample data. - let (samples_sender, samples_receiver) = channel(); + pub fn new(_: &Endpoint, _: &Format, _: &EventLoop) + -> Result<(Voice, SamplesStream), CreationError> + { + let inner = Arc::new(Mutex::new(SamplesStreamInner { + scheduled_task: None, + current_callback: None, + })); - let underflow = Arc::new(Mutex::new(RefCell::new(false))); - let uf_clone = underflow.clone(); - - let pending_samples: Arc = Arc::new(AtomicUsize::new(0)); - - let pending_samples_c = pending_samples.clone(); - - let audio_unit_result = AudioUnit::new(IOType::HalOutput); - - if let Ok(mut audio_unit) = audio_unit_result { - // TODO: iOS uses integer and fixed-point data - if let Ok(()) = audio_unit.set_render_callback(move |args: render_callback::Args>| { - let render_callback::Args { num_frames, mut data, .. } = args; - let num_channels = data.channels().count(); - if let Err(_) = ready_sender.send((num_channels, num_frames)) { - return Err(()); - } - loop { - if let Ok((samples, num_channels)) = samples_receiver.try_recv() { - let samples: Vec = samples; - if let Ok(uf) = uf_clone.lock() { - *(uf.borrow_mut()) = num_frames > samples.len() / num_channels; - } else { return Err(()) } - - pending_samples_c.fetch_sub(samples.len(), Ordering::SeqCst); - - for (i, frame) in samples.chunks(num_channels).enumerate() { - for (channel, sample) in data.channels_mut().zip(frame.iter()) { - channel[i] = *sample; - } - } - - break; - }; - } - Ok(()) - - }) { - if let Ok(()) = audio_unit.start() { - return Ok(Voice { - audio_unit: audio_unit, - ready_receiver: ready_receiver, - samples_sender: samples_sender, - underflow: underflow, - last_ready: Arc::new(Mutex::new(RefCell::new(None))), - pending_samples: pending_samples - }) - } + fn convert_error(err: coreaudio::Error) -> CreationError { + match err { + coreaudio::Error::RenderCallbackBufferFormatDoesNotMatchAudioUnitStreamFormat | + coreaudio::Error::NoKnownSubtype | + coreaudio::Error::AudioUnit(coreaudio::error::AudioUnitError::FormatNotSupported) | + coreaudio::Error::AudioCodec(_) | + coreaudio::Error::AudioFormat(_) => CreationError::FormatNotSupported, + _ => CreationError::DeviceNotAvailable, } } - Err(CreationError::DeviceNotAvailable) - } + let au_type = coreaudio::audio_unit::IOType::DefaultOutput; + let mut audio_unit = try!(AudioUnit::new(au_type).map_err(convert_error)); - pub fn append_data<'a, T>(&'a mut self, max_elements: usize) -> Buffer<'a, T> where T: Clone { - // Block until the audio callback is ready for more data. - let (channels, frames) = self.block_until_ready(); - let buffer_size = cmp::min(channels * frames, max_elements); - Buffer { - samples_sender: self.samples_sender.clone(), - samples: vec![unsafe { mem::uninitialized() }; buffer_size], - num_channels: channels as usize, - marker: PhantomData, - pending_samples: self.pending_samples.clone() + // TODO: iOS uses integer and fixed-point data + + { + let inner = inner.clone(); + let result = audio_unit.set_render_callback(move |args| { + // This callback is entered whenever the coreaudio engine needs to be fed data. + + // Store the callback argument in the `SamplesStreamInner` and return the task + // that we're supposed to notify. + let scheduled = { + let mut inner = inner.lock().unwrap(); + + assert!(inner.current_callback.is_none()); + inner.current_callback = Some(args); + + inner.scheduled_task.take() + }; + + // It is important that `inner` is unlocked here. + if let Some(scheduled) = scheduled { + // Calling `notify()` should immediately call `poll()` on the `SamplesStream`, + // which will use the data we stored in `current_callback`. + scheduled.notify(); + } + + // TODO: what should happen if the callback wasn't processed? in other word, what + // if the user didn't register any handler or did a stupid thing in the + // handler (like mem::forgetting the buffer)? + + Ok(()) + }); + + try!(result.map_err(convert_error)); } + + try!(audio_unit.start().map_err(convert_error)); + + let samples_stream = SamplesStream { + audio_unit: audio_unit, + inner: inner, + }; + + Ok((Voice, samples_stream)) } #[inline] @@ -177,72 +205,4 @@ impl Voice { pub fn pause(&mut self) { unimplemented!() } - - #[inline] - pub fn get_period(&self) -> usize { - if let Some(ready) = self.update_last_ready() { - (ready.0 * ready.1) as usize - } else { - 0 - } - } - - #[inline] - pub fn get_pending_samples(&self) -> usize { - self.pending_samples.load(Ordering::Relaxed) - } - - /// Attempts to store the most recent ready message into the internal - /// ref cell, then return the last ready message. If the last ready hasn't - /// been reset with `clear_last_ready`, then it will not be set and the - /// current value will be returned. Else, the ready_receiver will be - /// try_recv'd and if it is ready, the last ready will be set and returned. - /// Finally, if the ready_receiver had no data at try_recv, None will be - /// returned. - #[inline] - fn update_last_ready(&self) -> Option<(NumChannels, NumFrames)> { - let refcell = self.last_ready.lock().unwrap(); - let data = refcell.borrow(); - if let Some(s) = *data { - // - return Some(s); - } else { - drop(data); - let mut data = refcell.borrow_mut(); - if let Ok(ready) = self.ready_receiver.try_recv() { - // the audiounit is ready so we can set last_ready - *data = Some(ready); - return *data; - } - } - None - } - - /// Block until ready to send data. This checks last_ready first. In any - /// case, last_ready will be set to None when this function returns. - fn block_until_ready(&self) -> (NumChannels, NumFrames) { - let refcell = self.last_ready.lock().unwrap(); - let data = refcell.borrow(); - if let Some(s) = *data { - drop(data); - let mut data = refcell.borrow_mut(); - *data = None; - return s; - } else { - match self.ready_receiver.recv() { - Ok(ready) => { - return ready; - }, - Err(e) => panic!("Couldn't receive a ready message: \ - {:?}", e) - } - } - } - - #[inline] - pub fn underflowed(&self) -> bool { - let uf = self.underflow.lock().unwrap(); - let v = uf.borrow(); - *v - } } From 71e94bb6d16789be71f54daeea447cb64e8db858 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 12 Aug 2016 17:51:15 +0200 Subject: [PATCH 7/9] Update the Cargo.toml and the README --- Cargo.toml | 12 ++++-------- README.md | 9 ++++++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e81b4b1..44e1144 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,15 @@ [package] - name = "cpal" -version = "0.2.12" -authors = ["Pierre Krieger "] -description = "Cross-platform audio playing library in pure Rust." +version = "0.3.0" +authors = ["The CPAL contributors", "Pierre Krieger "] +description = "Low-level cross-platform audio playing library in pure Rust." repository = "https://github.com/tomaka/cpal" documentation = "http://tomaka.github.io/cpal/" license = "Apache-2.0" keywords = ["audio", "sound"] [dependencies] -futures = { git = "https://github.com/alexcrichton/futures-rs" } +futures = "0.1.0" libc = "0.2" lazy_static = "0.2" winapi = "0.2.8" @@ -35,6 +34,3 @@ path = "alsa-sys" [target.x86_64-apple-darwin.dependencies] coreaudio-rs = "0.6" - -[dev-dependencies] -vorbis = "0" diff --git a/README.md b/README.md index c252ca5..7510ae8 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,13 @@ # CPAL - Cross-platform audio library -Audio player in pure Rust. Works only on win32 (WASAPI) and linux (ALSA) for the moment. - [Documentation](http://tomaka.github.io/cpal/) ```toml [dependencies] -cpal = "0.1.0" +cpal = "0.3.0" ``` + +Low-level library for audio playback in pure Rust. + +This library allows you to open a channel with the audio device of the user's machine, and +send PCM data to it. From b1add0b12b95e37ca09218016c9724a79af21fda Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 12 Aug 2016 17:57:06 +0200 Subject: [PATCH 8/9] Fix most warnings --- examples/enumerate.rs | 2 -- src/wasapi/mod.rs | 1 - src/wasapi/voice.rs | 8 ++------ 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/examples/enumerate.rs b/examples/enumerate.rs index 570d380..1d467ab 100644 --- a/examples/enumerate.rs +++ b/examples/enumerate.rs @@ -1,7 +1,5 @@ extern crate cpal; -use cpal::*; - fn main() { let endpoints = cpal::get_endpoints_list(); diff --git a/src/wasapi/mod.rs b/src/wasapi/mod.rs index a951b5c..2dd85f4 100644 --- a/src/wasapi/mod.rs +++ b/src/wasapi/mod.rs @@ -37,7 +37,6 @@ fn check_result(result: winapi::HRESULT) -> Result<(), IoError> { /// Wrapper because of that stupid decision to remove `Send` and `Sync` from raw pointers. #[derive(Copy, Clone)] -#[allow(raw_pointer_derive)] struct IAudioClientWrapper(*mut winapi::IAudioClient); unsafe impl Send for IAudioClientWrapper {} unsafe impl Sync for IAudioClientWrapper {} diff --git a/src/wasapi/voice.rs b/src/wasapi/voice.rs index 3d49469..ef8a795 100644 --- a/src/wasapi/voice.rs +++ b/src/wasapi/voice.rs @@ -5,11 +5,9 @@ use super::winapi; use super::Endpoint; use super::check_result; -use std::cmp; use std::slice; use std::mem; use std::ptr; -use std::marker::PhantomData; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -221,7 +219,7 @@ impl Voice { } match check_result((*audio_client).SetEventHandle(event)) { - Err(e) => { + Err(_) => { (*audio_client).Release(); panic!("Failed to call SetEventHandle") }, @@ -333,7 +331,7 @@ impl Stream for SamplesStream { type Item = UnknownTypeBuffer; type Error = (); - fn poll(&mut self, task: &mut Task) -> Poll, Self::Error> { + fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { unsafe { if self.ready.swap(false, Ordering::Relaxed) == false { let result = kernel32::WaitForSingleObject(self.event, 0); @@ -356,8 +354,6 @@ impl Stream for SamplesStream { self.max_frames_in_buffer - padding }; - assert!(frames_available >= 0); - // Obtaining a pointer to the buffer. let (buffer_data, buffer_len) = { let mut buffer: *mut winapi::BYTE = mem::uninitialized(); From bf051dd16f2f8fd160e398a4ff5e0c3e70b3aae4 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 12 Aug 2016 18:06:17 +0200 Subject: [PATCH 9/9] Add some documentation to the winapi implementation --- src/wasapi/voice.rs | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/src/wasapi/voice.rs b/src/wasapi/voice.rs index ef8a795..6be243b 100644 --- a/src/wasapi/voice.rs +++ b/src/wasapi/voice.rs @@ -32,10 +32,25 @@ unsafe impl Send for EventLoop {} unsafe impl Sync for EventLoop {} struct EventLoopInner { - // This event is signalled after elements have been added to `pending_scheduled`. - pending_scheduled_event: winapi::HANDLE, + // List of handles that are currently being polled or that are going to be polled. This mutex + // is locked for as long as the event loop is running. + // + // In the `EventLoopScheduled`, the first handle in the list of handles is always + // `pending_scheduled_event`. This means that the length of `handles` is always 1 + the length + // of `task_handles`. + // FIXME: no way to remove elements from that list? scheduled: Mutex, + + // Since the above mutex is locked most of the time, we add new handles to this list instead. + // After a new element is added to this list, you should notify `pending_scheduled_event` + // so that they get transferred to `scheduled`. + // + // The length of `handles` and `task_handles` should always be equal. pending_scheduled: Mutex, + + // This event is signalled after elements have been added to `pending_scheduled` in order to + // notify that they should be picked up. + pending_scheduled_event: winapi::HANDLE, } struct EventLoopScheduled { @@ -44,7 +59,8 @@ struct EventLoopScheduled { // `WaitForMultipleObjectsEx` on the array without having to perform any conversion. handles: Vec, - // List of task handles corresponding to `handles`. + // List of task handles corresponding to `handles`. The second element is used to signal + // the voice that it has been signaled. task_handles: Vec<(TaskHandle, Arc)>, } @@ -74,7 +90,10 @@ impl EventLoop { let mut scheduled = self.inner.scheduled.lock().unwrap(); loop { + debug_assert!(scheduled.handles.len() == 1 + scheduled.task_handles.len()); + // Creating a voice checks for the MAXIMUM_WAIT_OBJECTS limit. + // FIXME: this is not the case ^ debug_assert!(scheduled.handles.len() <= winapi::MAXIMUM_WAIT_OBJECTS as usize); // Wait for any of the handles to be signalled, which means that the corresponding @@ -89,6 +108,8 @@ impl EventLoop { let handle_id = (result - winapi::WAIT_OBJECT_0) as usize; if handle_id == 0 { + // The `pending_scheduled_event` handle has been notified, which means that we + // should pick up the content of `pending_scheduled`. let mut pending = self.inner.pending_scheduled.lock().unwrap(); scheduled.handles.append(&mut pending.handles); scheduled.task_handles.append(&mut pending.task_handles); @@ -144,18 +165,18 @@ impl Voice { -> Result<(Voice, SamplesStream), CreationError> { unsafe { - // making sure that COM is initialized - // it's not actually sure that this is required, but when in doubt do it + // Making sure that COM is initialized. + // It's not actually sure that this is required, but when in doubt do it. com::com_initialized(); - // obtaining a `IAudioClient` + // Obtaining a `IAudioClient`. let audio_client = match end_point.build_audioclient() { Err(ref e) if e.raw_os_error() == Some(winapi::AUDCLNT_E_DEVICE_INVALIDATED) => return Err(CreationError::DeviceNotAvailable), e => e.unwrap(), }; - // computing the format and initializing the device + // Computing the format and initializing the device. let format = { let format_attempt = try!(format_to_waveformatextensible(format)); let share_mode = winapi::AUDCLNT_SHAREMODE_SHARED; @@ -334,9 +355,10 @@ impl Stream for SamplesStream { fn poll(&mut self, _: &mut Task) -> Poll, Self::Error> { unsafe { if self.ready.swap(false, Ordering::Relaxed) == false { + // Despite its name this function does not block, because we pass `0`. let result = kernel32::WaitForSingleObject(self.event, 0); - // Returning if timeout. + // Returning if the event is not ready. match result { winapi::WAIT_OBJECT_0 => (), winapi::WAIT_TIMEOUT => return Poll::NotReady, @@ -344,6 +366,8 @@ impl Stream for SamplesStream { }; } + // If we reach here, that means we're ready to accept new samples. + let mut inner = self.inner.lock().unwrap(); // Obtaining the number of frames that are available to be written.