update to futures 0.1.1: alsa wasapi
This commit is contained in:
parent
ad360d2a32
commit
e031025abe
|
@ -1,14 +1,27 @@
|
||||||
extern crate cpal;
|
extern crate cpal;
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
|
use futures::task;
|
||||||
|
use futures::task::Executor;
|
||||||
|
use futures::task::Run;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
struct MyExecutor;
|
||||||
|
|
||||||
|
impl Executor for MyExecutor {
|
||||||
|
fn execute(&self, r: Run) {
|
||||||
|
r.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let endpoint = cpal::get_default_endpoint().expect("Failed to get default endpoint");
|
let endpoint = cpal::get_default_endpoint().expect("Failed to get default endpoint");
|
||||||
let format = endpoint.get_supported_formats_list().unwrap().next().expect("Failed to get endpoint format");
|
let format = endpoint.get_supported_formats_list().unwrap().next().expect("Failed to get endpoint format");
|
||||||
|
|
||||||
let event_loop = cpal::EventLoop::new();
|
let event_loop = cpal::EventLoop::new();
|
||||||
|
let executor = Arc::new(MyExecutor);
|
||||||
|
|
||||||
let (mut voice, stream) = cpal::Voice::new(&endpoint, &format, &event_loop).expect("Failed to create a voice");
|
let (mut voice, stream) = cpal::Voice::new(&endpoint, &format, &event_loop).expect("Failed to create a voice");
|
||||||
|
|
||||||
|
@ -18,7 +31,7 @@ fn main() {
|
||||||
.map(move |t| t.sin());
|
.map(move |t| t.sin());
|
||||||
|
|
||||||
voice.play();
|
voice.play();
|
||||||
stream.for_each(move |buffer| -> Result<_, ()> {
|
task::spawn(stream.for_each(move |buffer| -> Result<_, ()> {
|
||||||
match buffer {
|
match buffer {
|
||||||
cpal::UnknownTypeBuffer::U16(mut buffer) => {
|
cpal::UnknownTypeBuffer::U16(mut buffer) => {
|
||||||
for (sample, value) in buffer.chunks_mut(format.channels.len()).zip(&mut data_source) {
|
for (sample, value) in buffer.chunks_mut(format.channels.len()).zip(&mut data_source) {
|
||||||
|
@ -42,7 +55,7 @@ fn main() {
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}).forget();
|
})).execute(executor);
|
||||||
|
|
||||||
event_loop.run();
|
event_loop.run();
|
||||||
}
|
}
|
||||||
|
|
126
src/alsa/mod.rs
126
src/alsa/mod.rs
|
@ -16,9 +16,10 @@ use std::vec::IntoIter as VecIntoIter;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use futures::Poll;
|
use futures::Poll;
|
||||||
use futures::Task;
|
use futures::task::Task;
|
||||||
use futures::TaskHandle;
|
use futures::task;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
|
use futures::Async;
|
||||||
|
|
||||||
pub type SupportedFormatsIterator = VecIntoIter<Format>;
|
pub type SupportedFormatsIterator = VecIntoIter<Format>;
|
||||||
|
|
||||||
|
@ -295,7 +296,7 @@ impl EventLoop {
|
||||||
if (revent as libc::c_short & libc::POLLOUT) != 0 {
|
if (revent as libc::c_short & libc::POLLOUT) != 0 {
|
||||||
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
|
let scheduled = current_wait.voices[i_voice].scheduled.lock().unwrap().take();
|
||||||
if let Some(scheduled) = scheduled {
|
if let Some(scheduled) = scheduled {
|
||||||
scheduled.notify();
|
scheduled.unpark();
|
||||||
}
|
}
|
||||||
|
|
||||||
for _ in 0 .. current_wait.voices[i_voice].num_descriptors {
|
for _ in 0 .. current_wait.voices[i_voice].num_descriptors {
|
||||||
|
@ -348,74 +349,20 @@ struct VoiceInner {
|
||||||
period_len: usize,
|
period_len: usize,
|
||||||
|
|
||||||
// If `Some`, something previously called `schedule` on the stream.
|
// If `Some`, something previously called `schedule` on the stream.
|
||||||
scheduled: Mutex<Option<TaskHandle>>,
|
scheduled: Mutex<Option<Task>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for VoiceInner {}
|
unsafe impl Send for VoiceInner {}
|
||||||
unsafe impl Sync for VoiceInner {}
|
unsafe impl Sync for VoiceInner {}
|
||||||
|
|
||||||
impl Stream for SamplesStream {
|
impl SamplesStream {
|
||||||
type Item = UnknownTypeBuffer;
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
|
|
||||||
// Determine the number of samples that are available to write.
|
|
||||||
let available = {
|
|
||||||
let channel = self.inner.channel.lock().expect("could not lock channel");
|
|
||||||
let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update?
|
|
||||||
|
|
||||||
if available == -32 {
|
|
||||||
// buffer underrun
|
|
||||||
self.inner.buffer_len
|
|
||||||
} else if available < 0 {
|
|
||||||
check_errors(available as libc::c_int).expect("buffer is not available");
|
|
||||||
unreachable!()
|
|
||||||
} else {
|
|
||||||
(available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// If we don't have one period ready, return `NotReady`.
|
|
||||||
if available < self.inner.period_len {
|
|
||||||
return Poll::NotReady;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We now sure that we're ready to write data.
|
|
||||||
match self.inner.sample_format {
|
|
||||||
SampleFormat::I16 => {
|
|
||||||
let buffer = Buffer {
|
|
||||||
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
|
||||||
inner: self.inner.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Poll::Ok(Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) })))
|
|
||||||
},
|
|
||||||
SampleFormat::U16 => {
|
|
||||||
let buffer = Buffer {
|
|
||||||
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
|
||||||
inner: self.inner.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Poll::Ok(Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) })))
|
|
||||||
},
|
|
||||||
SampleFormat::F32 => {
|
|
||||||
let buffer = Buffer {
|
|
||||||
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
|
||||||
inner: self.inner.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn schedule(&mut self, task: &mut Task) {
|
fn schedule(&mut self) {
|
||||||
unsafe {
|
unsafe {
|
||||||
let channel = self.inner.channel.lock().unwrap();
|
let channel = self.inner.channel.lock().unwrap();
|
||||||
|
|
||||||
// We start by filling `scheduled`.
|
// We start by filling `scheduled`.
|
||||||
*self.inner.scheduled.lock().unwrap() = Some(task.handle().clone());
|
*self.inner.scheduled.lock().unwrap() = Some(task::park());
|
||||||
|
|
||||||
// In this function we turn the `snd_pcm_t` into a collection of file descriptors.
|
// In this function we turn the `snd_pcm_t` into a collection of file descriptors.
|
||||||
// And we add these descriptors to `event_loop.pending_wait.descriptors`.
|
// And we add these descriptors to `event_loop.pending_wait.descriptors`.
|
||||||
|
@ -444,6 +391,63 @@ impl Stream for SamplesStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Stream for SamplesStream {
|
||||||
|
type Item = UnknownTypeBuffer;
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
|
// Determine the number of samples that are available to write.
|
||||||
|
let available = {
|
||||||
|
let channel = self.inner.channel.lock().expect("could not lock channel");
|
||||||
|
let available = unsafe { alsa::snd_pcm_avail(*channel) }; // TODO: what about snd_pcm_avail_update?
|
||||||
|
|
||||||
|
if available == -32 {
|
||||||
|
// buffer underrun
|
||||||
|
self.inner.buffer_len
|
||||||
|
} else if available < 0 {
|
||||||
|
check_errors(available as libc::c_int).expect("buffer is not available");
|
||||||
|
unreachable!()
|
||||||
|
} else {
|
||||||
|
(available * self.inner.num_channels as alsa::snd_pcm_sframes_t) as usize
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// If we don't have one period ready, return `NotReady`.
|
||||||
|
if available < self.inner.period_len {
|
||||||
|
self.schedule();
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We now sure that we're ready to write data.
|
||||||
|
match self.inner.sample_format {
|
||||||
|
SampleFormat::I16 => {
|
||||||
|
let buffer = Buffer {
|
||||||
|
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Async::Ready((Some(UnknownTypeBuffer::I16(::Buffer { target: Some(buffer) })))))
|
||||||
|
},
|
||||||
|
SampleFormat::U16 => {
|
||||||
|
let buffer = Buffer {
|
||||||
|
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Async::Ready((Some(UnknownTypeBuffer::U16(::Buffer { target: Some(buffer) })))))
|
||||||
|
},
|
||||||
|
SampleFormat::F32 => {
|
||||||
|
let buffer = Buffer {
|
||||||
|
buffer: iter::repeat(unsafe { mem::uninitialized() }).take(available).collect(),
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Async::Ready((Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Wrapper around `hw_params`.
|
/// Wrapper around `hw_params`.
|
||||||
struct HwParams(*mut alsa::snd_pcm_hw_params_t);
|
struct HwParams(*mut alsa::snd_pcm_hw_params_t);
|
||||||
|
|
||||||
|
|
31
src/lib.rs
31
src/lib.rs
|
@ -22,15 +22,28 @@ The `voice` can be used to control the play/pause of the output, while the `samp
|
||||||
be used to register a callback that will be called whenever the backend is ready to get data.
|
be used to register a callback that will be called whenever the backend is ready to get data.
|
||||||
See the documentation of `futures-rs` for more info about how to use streams.
|
See the documentation of `futures-rs` for more info about how to use streams.
|
||||||
|
|
||||||
```ignore // TODO: unignore
|
```no_run
|
||||||
# let mut samples_stream: cpal::SamplesStream = unsafe { std::mem::uninitialized() };
|
# extern crate futures;
|
||||||
|
# extern crate cpal;
|
||||||
|
# use std::sync::Arc;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
|
use futures::task;
|
||||||
|
# struct MyExecutor;
|
||||||
|
# impl task::Executor for MyExecutor {
|
||||||
|
# fn execute(&self, r: task::Run) {
|
||||||
|
# r.run();
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# fn main() {
|
||||||
|
# let mut samples_stream: cpal::SamplesStream = unsafe { std::mem::uninitialized() };
|
||||||
|
# let my_executor = Arc::new(MyExecutor);
|
||||||
|
|
||||||
samples_stream.for_each(move |buffer| -> Result<_, ()> {
|
task::spawn(samples_stream.for_each(move |buffer| -> Result<_, ()> {
|
||||||
// write data to `buffer` here
|
// write data to `buffer` here
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}).forget();
|
})).execute(my_executor);
|
||||||
|
# }
|
||||||
```
|
```
|
||||||
|
|
||||||
TODO: add example
|
TODO: add example
|
||||||
|
@ -72,7 +85,6 @@ use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use futures::Poll;
|
use futures::Poll;
|
||||||
use futures::Task;
|
|
||||||
|
|
||||||
mod null;
|
mod null;
|
||||||
mod samples_formats;
|
mod samples_formats;
|
||||||
|
@ -397,13 +409,8 @@ impl Stream for SamplesStream {
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll(&mut self, task: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
self.0.poll(task)
|
self.0.poll()
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn schedule(&mut self, task: &mut Task) {
|
|
||||||
self.0.schedule(task)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,8 @@
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use futures::Poll;
|
use futures::Poll;
|
||||||
use futures::Task;
|
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
|
use futures::Async;
|
||||||
|
|
||||||
use CreationError;
|
use CreationError;
|
||||||
use Format;
|
use Format;
|
||||||
|
@ -89,12 +89,8 @@ impl Stream for SamplesStream {
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
Poll::NotReady
|
Ok(Async::NotReady)
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn schedule(&mut self, _: &mut Task) {
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,9 +14,10 @@ use std::sync::Arc;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use futures::Poll;
|
use futures::Poll;
|
||||||
use futures::Task;
|
use futures::task::Task;
|
||||||
use futures::TaskHandle;
|
use futures::task;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
|
use futures::Async;
|
||||||
|
|
||||||
use CreationError;
|
use CreationError;
|
||||||
use ChannelPosition;
|
use ChannelPosition;
|
||||||
|
@ -61,7 +62,7 @@ struct EventLoopScheduled {
|
||||||
|
|
||||||
// List of task handles corresponding to `handles`. The second element is used to signal
|
// List of task handles corresponding to `handles`. The second element is used to signal
|
||||||
// the voice that it has been signaled.
|
// the voice that it has been signaled.
|
||||||
task_handles: Vec<(TaskHandle, Arc<AtomicBool>)>,
|
task_handles: Vec<(Task, Arc<AtomicBool>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventLoop {
|
impl EventLoop {
|
||||||
|
@ -118,7 +119,7 @@ impl EventLoop {
|
||||||
scheduled.handles.remove(handle_id);
|
scheduled.handles.remove(handle_id);
|
||||||
let (task_handle, ready) = scheduled.task_handles.remove(handle_id - 1);
|
let (task_handle, ready) = scheduled.task_handles.remove(handle_id - 1);
|
||||||
ready.store(true, Ordering::Relaxed);
|
ready.store(true, Ordering::Relaxed);
|
||||||
task_handle.notify();
|
task_handle.unpark();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -348,26 +349,43 @@ impl Voice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl SamplesStream {
|
||||||
|
#[inline]
|
||||||
|
fn schedule(&mut self) {
|
||||||
|
let mut pending = self.event_loop.pending_scheduled.lock().unwrap();
|
||||||
|
pending.handles.push(self.event);
|
||||||
|
pending.task_handles.push((task::park(), self.ready.clone()));
|
||||||
|
drop(pending);
|
||||||
|
|
||||||
|
let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) };
|
||||||
|
assert!(result != 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Stream for SamplesStream {
|
impl Stream for SamplesStream {
|
||||||
type Item = UnknownTypeBuffer;
|
type Item = UnknownTypeBuffer;
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
unsafe {
|
unsafe {
|
||||||
if self.ready.swap(false, Ordering::Relaxed) == false {
|
if self.ready.swap(false, Ordering::Relaxed) == false {
|
||||||
// Despite its name this function does not block, because we pass `0`.
|
// Despite its name this function does not block, because we pass `0`.
|
||||||
let result = kernel32::WaitForSingleObject(self.event, 0);
|
let result = kernel32::WaitForSingleObject(self.event, 0);
|
||||||
|
|
||||||
// Returning if the event is not ready.
|
// Park the task and returning if the event is not ready.
|
||||||
match result {
|
match result {
|
||||||
winapi::WAIT_OBJECT_0 => (),
|
winapi::WAIT_OBJECT_0 => (),
|
||||||
winapi::WAIT_TIMEOUT => return Poll::NotReady,
|
winapi::WAIT_TIMEOUT => {
|
||||||
|
self.schedule();
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
},
|
||||||
_ => unreachable!()
|
_ => unreachable!()
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we reach here, that means we're ready to accept new samples.
|
// If we reach here, that means we're ready to accept new samples.
|
||||||
|
|
||||||
|
let poll = {
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
|
||||||
// Obtaining the number of frames that are available to be written.
|
// Obtaining the number of frames that are available to be written.
|
||||||
|
@ -378,7 +396,9 @@ impl Stream for SamplesStream {
|
||||||
self.max_frames_in_buffer - padding
|
self.max_frames_in_buffer - padding
|
||||||
};
|
};
|
||||||
|
|
||||||
if frames_available == 0 { return Poll::NotReady; }
|
if frames_available == 0 {
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
} else {
|
||||||
|
|
||||||
// Obtaining a pointer to the buffer.
|
// Obtaining a pointer to the buffer.
|
||||||
let (buffer_data, buffer_len) = {
|
let (buffer_data, buffer_len) = {
|
||||||
|
@ -399,18 +419,16 @@ impl Stream for SamplesStream {
|
||||||
frames: frames_available,
|
frames: frames_available,
|
||||||
};
|
};
|
||||||
|
|
||||||
Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) // FIXME: not necessarily F32
|
Ok(Async::Ready(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) })))) // FIXME: not necessarily F32
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Ok(Async::NotReady) = poll {
|
||||||
|
self.schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn schedule(&mut self, task: &mut Task) {
|
poll
|
||||||
let mut pending = self.event_loop.pending_scheduled.lock().unwrap();
|
}
|
||||||
pending.handles.push(self.event);
|
|
||||||
pending.task_handles.push((task.handle().clone(), self.ready.clone()));
|
|
||||||
drop(pending);
|
|
||||||
|
|
||||||
let result = unsafe { kernel32::SetEvent(self.event_loop.pending_scheduled_event) };
|
|
||||||
assert!(result != 0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue