Merge pull request #131 from tomaka/tryfix-osx

Try fix the OSX code with futures
This commit is contained in:
tomaka 2016-10-01 10:28:20 +02:00 committed by GitHub
commit a00cf67900
1 changed files with 13 additions and 11 deletions

View File

@ -10,7 +10,10 @@ use SamplesRate;
use ChannelPosition; use ChannelPosition;
use UnknownTypeBuffer; use UnknownTypeBuffer;
use futures::{Poll, Task, TaskHandle}; use futures::Poll;
use futures::Async;
use futures::task::Task;
use futures::task;
use futures::stream::Stream; use futures::stream::Stream;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -94,7 +97,7 @@ pub struct SamplesStream {
struct SamplesStreamInner { struct SamplesStreamInner {
scheduled_task: Option<TaskHandle>, scheduled_task: Option<Task>,
current_callback: Option<render_callback::Args<data::NonInterleaved<f32>>>, current_callback: Option<render_callback::Args<data::NonInterleaved<f32>>>,
} }
@ -102,7 +105,7 @@ impl Stream for SamplesStream {
type Item = UnknownTypeBuffer; type Item = UnknownTypeBuffer;
type Error = (); type Error = ();
fn poll(&mut self, _: &mut Task) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
// There are two possibilites: either we're answering a callback of coreaudio and we return // There are two possibilites: either we're answering a callback of coreaudio and we return
@ -110,7 +113,10 @@ impl Stream for SamplesStream {
let current_callback = match inner.current_callback.take() { let current_callback = match inner.current_callback.take() {
Some(c) => c, Some(c) => c,
None => return Poll::NotReady None => {
inner.scheduled_task = Some(task::park());
return Ok(Async::NotReady);
}
}; };
let buffer_len = current_callback.num_frames * current_callback.data.channels().count(); let buffer_len = current_callback.num_frames * current_callback.data.channels().count();
@ -120,11 +126,7 @@ impl Stream for SamplesStream {
buffer: vec![0.0; buffer_len], buffer: vec![0.0; buffer_len],
}; };
Poll::Ok(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))) Ok(Async::Ready(Some(UnknownTypeBuffer::F32(::Buffer { target: Some(buffer) }))))
}
fn schedule(&mut self, task: &mut Task) {
self.inner.lock().unwrap().scheduled_task = Some(task.handle().clone());
} }
} }
@ -171,9 +173,9 @@ impl Voice {
// It is important that `inner` is unlocked here. // It is important that `inner` is unlocked here.
if let Some(scheduled) = scheduled { if let Some(scheduled) = scheduled {
// Calling `notify()` should immediately call `poll()` on the `SamplesStream`, // Calling `unpark()` should eventually call `poll()` on the `SamplesStream`,
// which will use the data we stored in `current_callback`. // which will use the data we stored in `current_callback`.
scheduled.notify(); scheduled.unpark();
} }
// TODO: what should happen if the callback wasn't processed? in other word, what // TODO: what should happen if the callback wasn't processed? in other word, what