Merge pull request #130 from thiolliere/local_branch

update to futures 0.1.1: alsa wasapi
This commit is contained in:
tomaka 2016-10-01 08:59:39 +02:00 committed by GitHub
commit aa97c76ecf
5 changed files with 167 additions and 129 deletions

View File

@ -1,14 +1,27 @@
extern crate cpal; extern crate cpal;
extern crate futures; extern crate futures;
use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use futures::task;
use futures::task::Executor;
use futures::task::Run;
use std::sync::Arc;
struct MyExecutor;
impl Executor for MyExecutor {
fn execute(&self, r: Run) {
r.run();
}
}
fn main() { fn main() {
let endpoint = cpal::get_default_endpoint().expect("Failed to get default endpoint"); let endpoint = cpal::get_default_endpoint().expect("Failed to get default endpoint");
let format = endpoint.get_supported_formats_list().unwrap().next().expect("Failed to get endpoint format"); let format = endpoint.get_supported_formats_list().unwrap().next().expect("Failed to get endpoint format");
let event_loop = cpal::EventLoop::new(); let event_loop = cpal::EventLoop::new();
let executor = Arc::new(MyExecutor);
let (mut voice, stream) = cpal::Voice::new(&endpoint, &format, &event_loop).expect("Failed to create a voice"); let (mut voice, stream) = cpal::Voice::new(&endpoint, &format, &event_loop).expect("Failed to create a voice");
@ -18,7 +31,7 @@ fn main() {
.map(move |t| t.sin()); .map(move |t| t.sin());
voice.play(); voice.play();
stream.for_each(move |buffer| -> Result<_, ()> { task::spawn(stream.for_each(move |buffer| -> Result<_, ()> {
match buffer { match buffer {
cpal::UnknownTypeBuffer::U16(mut buffer) => { cpal::UnknownTypeBuffer::U16(mut buffer) => {
for (sample, value) in buffer.chunks_mut(format.channels.len()).zip(&mut data_source) { for (sample, value) in buffer.chunks_mut(format.channels.len()).zip(&mut data_source) {
@ -42,7 +55,7 @@ fn main() {
}; };
Ok(()) Ok(())
}).forget(); })).execute(executor);
event_loop.run(); event_loop.run();
} }

View File

@ -16,9 +16,10 @@ use std::vec::IntoIter as VecIntoIter;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use futures::Poll; use futures::Poll;
use futures::Task; use futures::task::Task;
use futures::TaskHandle; use futures::task;
use futures::stream::Stream; use futures::stream::Stream;
use futures::Async;
pub type SupportedFormatsIterator = VecIntoIter<Format>; pub type SupportedFormatsIterator = VecIntoIter<Format>;
@ -295,7 +296,7 @@ impl EventLoop {
if (revent as libc::c_short & libc::POLLOUT) != 0 { if (revent as libc::c_short & libc::POLLOUT) != 0 {
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take(); let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
if let Some(scheduled) = scheduled { if let Some(scheduled) = scheduled {
scheduled.notify(); scheduled.unpark();
} }
for _ in 0 .. current_wait.voices[i_voice].num_descriptors { for _ in 0 .. current_wait.voices[i_voice].num_descriptors {
@ -348,74 +349,20 @@ struct VoiceInner {
period_len: usize, period_len: usize,
// If `Some`, something previously called `schedule` on the stream. // If `Some`, something previously called `schedule` on the stream.
scheduled: Mutex<Option<TaskHandle>>, scheduled: Mutex<Option<Task>>,
} }
unsafe impl Send for VoiceInner {} unsafe impl Send for VoiceInner {}
unsafe impl Sync for VoiceInner {} unsafe impl Sync for VoiceInner {}
impl Stream for SamplesStream { impl SamplesStream {
type Item = UnknownTypeBuffer;
type Error = ();
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
// Determine the number of samples that are available to write.
let available = {
let channel = self.inner.channel.lock().expect("could not lock channel");
let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update?
if available == -32 {
// buffer underrun
self.inner.buffer_len
} else if available < 0 {
check_errors(available as libc::c_int).expect("buffer is not available");
unreachable!()
} else {
(available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize
}
};
// If we don't have one period ready, return `NotReady`.
if available < self.inner.period_len {
return Poll::NotReady;
}
// We now sure that we're ready to write data.
match self.inner.sample_format {
SampleFormat::I16 => {
let buffer = Buffer {
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
inner: self.inner.clone(),
};
Poll::Ok(Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) })))
},
SampleFormat::U16 => {
let buffer = Buffer {
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
inner: self.inner.clone(),
};
Poll::Ok(Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) })))
},
SampleFormat::F32 => {
let buffer = Buffer {
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
inner: self.inner.clone(),
};
Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))
},
}
}
#[inline] #[inline]
fn schedule(&mut self, task: &mut Task) { fn schedule(&mut self) {
unsafe { unsafe {
let channel = self.inner.channel.lock().unwrap(); let channel = self.inner.channel.lock().unwrap();
// We start by filling `scheduled`. // We start by filling `scheduled`.
*self.inner.scheduled.lock().unwrap() = Some(task.handle().clone()); *self.inner.scheduled.lock().unwrap() = Some(task::park());
// In this function we turn the `snd_pcm_t` into a collection of file descriptors. // In this function we turn the `snd_pcm_t` into a collection of file descriptors.
// And we add these descriptors to `event_loop.pending_wait.descriptors`. // And we add these descriptors to `event_loop.pending_wait.descriptors`.
@ -444,6 +391,63 @@ impl Stream for SamplesStream {
} }
} }
impl Stream for SamplesStream {
type Item = UnknownTypeBuffer;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Determine the number of samples that are available to write.
let available = {
let channel = self.inner.channel.lock().expect("could not lock channel");
let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update?
if available == -32 {
// buffer underrun
self.inner.buffer_len
} else if available < 0 {
check_errors(available as libc::c_int).expect("buffer is not available");
unreachable!()
} else {
(available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize
}
};
// If we don't have one period ready, return `NotReady`.
if available < self.inner.period_len {
self.schedule();
return Ok(Async::NotReady);
}
// We now sure that we're ready to write data.
match self.inner.sample_format {
SampleFormat::I16 => {
let buffer = Buffer {
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
inner: self.inner.clone(),
};
Ok(Async::Ready((Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) })))))
},
SampleFormat::U16 => {
let buffer = Buffer {
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
inner: self.inner.clone(),
};
Ok(Async::Ready((Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) })))))
},
SampleFormat::F32 => {
let buffer = Buffer {
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
inner: self.inner.clone(),
};
Ok(Async::Ready((Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))))
},
}
}
}
/// Wrapper around `hw_params`. /// Wrapper around `hw_params`.
struct HwParams(*mut alsa::snd_pcm_hw_params_t); struct HwParams(*mut alsa::snd_pcm_hw_params_t);

View File

@ -22,15 +22,28 @@ The `voice` can be used to control the play/pause of the output, while the `samp
be used to register a callback that will be called whenever the backend is ready to get data. be used to register a callback that will be called whenever the backend is ready to get data.
See the documentation of `futures-rs` for more info about how to use streams. See the documentation of `futures-rs` for more info about how to use streams.
```ignore // TODO: unignore ```no_run
# let mut samples_stream: cpal::SamplesStream = unsafe { std::mem::uninitialized() }; # extern crate futures;
# extern crate cpal;
# use std::sync::Arc;
use futures::stream::Stream; use futures::stream::Stream;
use futures::task;
# struct MyExecutor;
# impl task::Executor for MyExecutor {
# fn execute(&self, r: task::Run) {
# r.run();
# }
# }
# fn main() {
# let mut samples_stream: cpal::SamplesStream = unsafe { std::mem::uninitialized() };
# let my_executor = Arc::new(MyExecutor);
samples_stream.for_each(move |buffer| -> Result<_, ()> { task::spawn(samples_stream.for_each(move |buffer| -> Result<_, ()> {
// write data to `buffer` here // write data to `buffer` here
Ok(()) Ok(())
}).forget(); })).execute(my_executor);
# }
``` ```
TODO: add example TODO: add example
@ -72,7 +85,6 @@ use std::ops::{Deref, DerefMut};
use futures::stream::Stream; use futures::stream::Stream;
use futures::Poll; use futures::Poll;
use futures::Task;
mod null; mod null;
mod samples_formats; mod samples_formats;
@ -397,13 +409,8 @@ impl Stream for SamplesStream {
type Error = (); type Error = ();
#[inline] #[inline]
fn poll(&mut self, task: &mut Task) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll(task) self.0.poll()
}
#[inline]
fn schedule(&mut self, task: &mut Task) {
self.0.schedule(task)
} }
} }

View File

@ -3,8 +3,8 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use futures::Poll; use futures::Poll;
use futures::Task;
use futures::stream::Stream; use futures::stream::Stream;
use futures::Async;
use CreationError; use CreationError;
use Format; use Format;
@ -89,12 +89,8 @@ impl Stream for SamplesStream {
type Error = (); type Error = ();
#[inline] #[inline]
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Poll::NotReady Ok(Async::NotReady)
}
#[inline]
fn schedule(&mut self, _: &mut Task) {
} }
} }

View File

@ -14,9 +14,10 @@ use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use futures::Poll; use futures::Poll;
use futures::Task; use futures::task::Task;
use futures::TaskHandle; use futures::task;
use futures::stream::Stream; use futures::stream::Stream;
use futures::Async;
use CreationError; use CreationError;
use ChannelPosition; use ChannelPosition;
@ -61,7 +62,7 @@ struct EventLoopScheduled {
// List of task handles corresponding to `handles`. The second element is used to signal // List of task handles corresponding to `handles`. The second element is used to signal
// the voice that it has been signaled. // the voice that it has been signaled.
task_handles: Vec<(TaskHandle, Arc<AtomicBool>)>, task_handles: Vec<(Task, Arc<AtomicBool>)>,
} }
impl EventLoop { impl EventLoop {
@ -118,7 +119,7 @@ impl EventLoop {
scheduled.handles.remove(handle_id); scheduled.handles.remove(handle_id);
let (task_handle, ready) = scheduled.task_handles.remove(handle_id - 1); let (task_handle, ready) = scheduled.task_handles.remove(handle_id - 1);
ready.store(true, Ordering::Relaxed); ready.store(true, Ordering::Relaxed);
task_handle.notify(); task_handle.unpark();
} }
} }
} }
@ -348,26 +349,43 @@ impl Voice {
} }
} }
impl SamplesStream {
#[inline]
fn schedule(&mut self) {
let mut pending = self.event_loop.pending_scheduled.lock().unwrap();
pending.handles.push(self.event);
pending.task_handles.push((task::park(), self.ready.clone()));
drop(pending);
let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) };
assert!(result != 0);
}
}
impl Stream for SamplesStream { impl Stream for SamplesStream {
type Item = UnknownTypeBuffer; type Item = UnknownTypeBuffer;
type Error = (); type Error = ();
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
unsafe { unsafe {
if self.ready.swap(false, Ordering::Relaxed) == false { if self.ready.swap(false, Ordering::Relaxed) == false {
// Despite its name this function does not block, because we pass `0`. // Despite its name this function does not block, because we pass `0`.
let result = kernel32::WaitForSingleObject(self.event, 0); let result = kernel32::WaitForSingleObject(self.event, 0);
// Returning if the event is not ready. // Park the task and returning if the event is not ready.
match result { match result {
winapi::WAIT_OBJECT_0 => (), winapi::WAIT_OBJECT_0 => (),
winapi::WAIT_TIMEOUT => return Poll::NotReady, winapi::WAIT_TIMEOUT => {
self.schedule();
return Ok(Async::NotReady);
},
_ => unreachable!() _ => unreachable!()
}; };
} }
// If we reach here, that means we're ready to accept new samples. // If we reach here, that means we're ready to accept new samples.
let poll = {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
// Obtaining the number of frames that are available to be written. // Obtaining the number of frames that are available to be written.
@ -378,7 +396,9 @@ impl Stream for SamplesStream {
self.max_frames_in_buffer - padding self.max_frames_in_buffer - padding
}; };
if frames_available == 0 { return Poll::NotReady; } if frames_available == 0 {
Ok(Async::NotReady)
} else {
// Obtaining a pointer to the buffer. // Obtaining a pointer to the buffer.
let (buffer_data, buffer_len) = { let (buffer_data, buffer_len) = {
@ -399,18 +419,16 @@ impl Stream for SamplesStream {
frames: frames_available, frames: frames_available,
}; };
Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) // FIXME: not necessarily F32 Ok(Async::Ready(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))) // FIXME: not necessarily F32
} }
};
if let Ok(Async::NotReady) = poll {
self.schedule();
} }
fn schedule(&mut self, task: &mut Task) { poll
let mut pending = self.event_loop.pending_scheduled.lock().unwrap(); }
pending.handles.push(self.event);
pending.task_handles.push((task.handle().clone(), self.ready.clone()));
drop(pending);
let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) };
assert!(result != 0);
} }
} }