Merge pull request #288 from mitchmindtree/callback_event
User Callback API `StreamEvent` Overhaul
This commit is contained in:
commit
2b9e2e0b2c
|
@ -17,7 +17,15 @@ fn main() -> Result<(), failure::Error> {
|
||||||
(sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin()
|
(sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin()
|
||||||
};
|
};
|
||||||
|
|
||||||
event_loop.run(move |_, data| {
|
event_loop.run(move |id, result| {
|
||||||
|
let data = match result {
|
||||||
|
Ok(data) => data,
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("an error occurred on stream {:?}: {}", id, err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
match data {
|
match data {
|
||||||
cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => {
|
cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => {
|
||||||
for sample in buffer.chunks_mut(format.channels as usize) {
|
for sample in buffer.chunks_mut(format.channels as usize) {
|
||||||
|
|
|
@ -49,7 +49,15 @@ fn main() -> Result<(), failure::Error> {
|
||||||
|
|
||||||
// Run the event loop on a separate thread.
|
// Run the event loop on a separate thread.
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
event_loop.run(move |id, data| {
|
event_loop.run(move |id, result| {
|
||||||
|
let data = match result {
|
||||||
|
Ok(data) => data,
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("an error occurred on stream {:?}: {}", id, err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
match data {
|
match data {
|
||||||
cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => {
|
cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => {
|
||||||
assert_eq!(id, input_stream_id);
|
assert_eq!(id, input_stream_id);
|
||||||
|
|
|
@ -30,7 +30,15 @@ fn main() -> Result<(), failure::Error> {
|
||||||
let writer_2 = writer.clone();
|
let writer_2 = writer.clone();
|
||||||
let recording_2 = recording.clone();
|
let recording_2 = recording.clone();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
event_loop.run(move |_, data| {
|
event_loop.run(move |id, event| {
|
||||||
|
let data = match event {
|
||||||
|
Ok(data) => data,
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("an error occurred on stream {:?}: {}", id, err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// If we're done recording, return early.
|
// If we're done recording, return early.
|
||||||
if !recording_2.load(std::sync::atomic::Ordering::Relaxed) {
|
if !recording_2.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
return;
|
return;
|
||||||
|
|
415
src/alsa/mod.rs
415
src/alsa/mod.rs
|
@ -15,6 +15,8 @@ use SupportedFormatsError;
|
||||||
use SampleFormat;
|
use SampleFormat;
|
||||||
use SampleRate;
|
use SampleRate;
|
||||||
use StreamData;
|
use StreamData;
|
||||||
|
use StreamDataResult;
|
||||||
|
use StreamError;
|
||||||
use SupportedFormat;
|
use SupportedFormat;
|
||||||
use UnknownTypeInputBuffer;
|
use UnknownTypeInputBuffer;
|
||||||
use UnknownTypeOutputBuffer;
|
use UnknownTypeOutputBuffer;
|
||||||
|
@ -353,7 +355,7 @@ pub struct EventLoop {
|
||||||
|
|
||||||
// A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so
|
// A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so
|
||||||
// that `poll()` can wake up and pick the changes.
|
// that `poll()` can wake up and pick the changes.
|
||||||
pending_trigger: Trigger,
|
pending_command_trigger: Trigger,
|
||||||
|
|
||||||
// This field is locked by the `run()` method.
|
// This field is locked by the `run()` method.
|
||||||
// The mutex also ensures that only one thread at a time has `run()` running.
|
// The mutex also ensures that only one thread at a time has `run()` running.
|
||||||
|
@ -377,7 +379,7 @@ enum Command {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RunContext {
|
struct RunContext {
|
||||||
// Descriptors to wait for. Always contains `pending_trigger.read_fd()` as first element.
|
// Descriptors to wait for. Always contains `pending_command_trigger.read_fd()` as first element.
|
||||||
descriptors: Vec<libc::pollfd>,
|
descriptors: Vec<libc::pollfd>,
|
||||||
// List of streams that are written in `descriptors`.
|
// List of streams that are written in `descriptors`.
|
||||||
streams: Vec<StreamInner>,
|
streams: Vec<StreamInner>,
|
||||||
|
@ -421,24 +423,25 @@ struct StreamInner {
|
||||||
// Lazily allocated buffer that is reused inside the loop.
|
// Lazily allocated buffer that is reused inside the loop.
|
||||||
// Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is
|
// Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is
|
||||||
// used.
|
// used.
|
||||||
buffer: Option<Vec<u8>>,
|
buffer: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
#[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
pub struct StreamId(usize);
|
pub struct StreamId(usize);
|
||||||
|
|
||||||
|
enum StreamType { Input, Output }
|
||||||
|
|
||||||
|
|
||||||
impl EventLoop {
|
impl EventLoop {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new() -> EventLoop {
|
pub fn new() -> EventLoop {
|
||||||
let pending_trigger = Trigger::new();
|
let pending_command_trigger = Trigger::new();
|
||||||
|
|
||||||
let initial_descriptors = vec![
|
let mut initial_descriptors = vec![];
|
||||||
libc::pollfd {
|
reset_descriptors_with_pending_command_trigger(
|
||||||
fd: pending_trigger.read_fd(),
|
&mut initial_descriptors,
|
||||||
events: libc::POLLIN,
|
&pending_command_trigger,
|
||||||
revents: 0,
|
);
|
||||||
},
|
|
||||||
];
|
|
||||||
|
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
|
|
||||||
|
@ -450,7 +453,7 @@ impl EventLoop {
|
||||||
|
|
||||||
EventLoop {
|
EventLoop {
|
||||||
next_stream_id: AtomicUsize::new(0),
|
next_stream_id: AtomicUsize::new(0),
|
||||||
pending_trigger: pending_trigger,
|
pending_command_trigger: pending_command_trigger,
|
||||||
run_context,
|
run_context,
|
||||||
commands: tx,
|
commands: tx,
|
||||||
}
|
}
|
||||||
|
@ -458,220 +461,188 @@ impl EventLoop {
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn run<F>(&self, mut callback: F) -> !
|
pub fn run<F>(&self, mut callback: F) -> !
|
||||||
where F: FnMut(StreamId, StreamData)
|
where F: FnMut(StreamId, StreamDataResult)
|
||||||
{
|
{
|
||||||
self.run_inner(&mut callback)
|
self.run_inner(&mut callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! {
|
fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! {
|
||||||
unsafe {
|
unsafe {
|
||||||
let mut run_context = self.run_context.lock().unwrap();
|
let mut run_context = self.run_context.lock().unwrap();
|
||||||
let run_context = &mut *run_context;
|
let run_context = &mut *run_context;
|
||||||
|
|
||||||
loop {
|
'stream_loop: loop {
|
||||||
{
|
process_commands(run_context);
|
||||||
for command in run_context.commands.try_iter() {
|
|
||||||
match command {
|
reset_descriptors_with_pending_command_trigger(
|
||||||
Command::DestroyStream(stream_id) => {
|
&mut run_context.descriptors,
|
||||||
run_context.streams.retain(|s| s.id != stream_id);
|
&self.pending_command_trigger,
|
||||||
},
|
);
|
||||||
Command::PlayStream(stream_id) => {
|
append_stream_poll_descriptors(run_context);
|
||||||
if let Some(stream) = run_context.streams.iter_mut()
|
|
||||||
.find(|stream| stream.can_pause && stream.id == stream_id)
|
// At this point, this should include the command `pending_commands_trigger` along
|
||||||
{
|
// with the poll descriptors for each stream.
|
||||||
alsa::snd_pcm_pause(stream.channel, 0);
|
match poll_all_descriptors(&mut run_context.descriptors) {
|
||||||
stream.is_paused = false;
|
Ok(true) => (),
|
||||||
}
|
Ok(false) => continue,
|
||||||
},
|
Err(err) => {
|
||||||
Command::PauseStream(stream_id) => {
|
for stream in run_context.streams.iter() {
|
||||||
if let Some(stream) = run_context.streams.iter_mut()
|
let result = Err(err.clone().into());
|
||||||
.find(|stream| stream.can_pause && stream.id == stream_id)
|
callback(stream.id, result);
|
||||||
{
|
|
||||||
alsa::snd_pcm_pause(stream.channel, 1);
|
|
||||||
stream.is_paused = true;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::NewStream(stream_inner) => {
|
|
||||||
run_context.streams.push(stream_inner);
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
run_context.streams.clear();
|
||||||
|
break 'stream_loop;
|
||||||
run_context.descriptors = vec![
|
|
||||||
libc::pollfd {
|
|
||||||
fd: self.pending_trigger.read_fd(),
|
|
||||||
events: libc::POLLIN,
|
|
||||||
revents: 0,
|
|
||||||
},
|
|
||||||
];
|
|
||||||
for stream in run_context.streams.iter() {
|
|
||||||
run_context.descriptors.reserve(stream.num_descriptors);
|
|
||||||
let len = run_context.descriptors.len();
|
|
||||||
let filled = alsa::snd_pcm_poll_descriptors(stream.channel,
|
|
||||||
run_context
|
|
||||||
.descriptors
|
|
||||||
.as_mut_ptr()
|
|
||||||
.offset(len as isize),
|
|
||||||
stream.num_descriptors as
|
|
||||||
libc::c_uint);
|
|
||||||
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
|
|
||||||
run_context.descriptors.set_len(len + stream.num_descriptors);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let ret = libc::poll(run_context.descriptors.as_mut_ptr(),
|
// If the `pending_command_trigger` was signaled, we need to process the comands.
|
||||||
run_context.descriptors.len() as libc::nfds_t,
|
|
||||||
-1 /* infinite */);
|
|
||||||
assert!(ret >= 0, "poll() failed");
|
|
||||||
|
|
||||||
if ret == 0 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the `pending_trigger` was signaled, we need to process the comands.
|
|
||||||
if run_context.descriptors[0].revents != 0 {
|
if run_context.descriptors[0].revents != 0 {
|
||||||
run_context.descriptors[0].revents = 0;
|
run_context.descriptors[0].revents = 0;
|
||||||
self.pending_trigger.clear_pipe();
|
self.pending_command_trigger.clear_pipe();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The set of streams that error within the following loop and should be removed.
|
||||||
|
let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![];
|
||||||
|
|
||||||
// Iterate over each individual stream/descriptor.
|
// Iterate over each individual stream/descriptor.
|
||||||
let mut i_stream = 0;
|
let mut i_stream = 0;
|
||||||
let mut i_descriptor = 1;
|
let mut i_descriptor = 1;
|
||||||
while (i_descriptor as usize) < run_context.descriptors.len() {
|
while (i_descriptor as usize) < run_context.descriptors.len() {
|
||||||
enum StreamType { Input, Output }
|
let stream = &mut run_context.streams[i_stream];
|
||||||
let stream_type;
|
let stream_descriptor_ptr = run_context.descriptors.as_mut_ptr().offset(i_descriptor);
|
||||||
let stream_inner = run_context.streams.get_mut(i_stream).unwrap();
|
|
||||||
|
|
||||||
// Check whether the event is `POLLOUT` or `POLLIN`. If neither, `continue`.
|
// Only go on if this event was a pollout or pollin event.
|
||||||
{
|
let stream_type = match check_for_pollout_or_pollin(stream, stream_descriptor_ptr) {
|
||||||
let mut revent = mem::uninitialized();
|
Ok(Some(ty)) => ty,
|
||||||
|
Ok(None) => {
|
||||||
{
|
i_descriptor += stream.num_descriptors as isize;
|
||||||
let num_descriptors = stream_inner.num_descriptors as libc::c_uint;
|
i_stream += 1;
|
||||||
let desc_ptr =
|
continue;
|
||||||
run_context.descriptors.as_mut_ptr().offset(i_descriptor);
|
},
|
||||||
let res = alsa::snd_pcm_poll_descriptors_revents(stream_inner.channel,
|
Err(err) => {
|
||||||
desc_ptr,
|
streams_to_remove.push((stream.id, err.into()));
|
||||||
num_descriptors,
|
i_descriptor += stream.num_descriptors as isize;
|
||||||
&mut revent);
|
|
||||||
check_errors(res).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
if revent as i16 == libc::POLLOUT {
|
|
||||||
stream_type = StreamType::Output;
|
|
||||||
} else if revent as i16 == libc::POLLIN {
|
|
||||||
stream_type = StreamType::Input;
|
|
||||||
} else {
|
|
||||||
i_descriptor += stream_inner.num_descriptors as isize;
|
|
||||||
i_stream += 1;
|
i_stream += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
// Determine the number of samples that are available to read/write.
|
// Get the number of available samples for reading/writing.
|
||||||
let available_samples = {
|
let available_samples = match get_available_samples(stream) {
|
||||||
let available = alsa::snd_pcm_avail(stream_inner.channel); // TODO: what about snd_pcm_avail_update?
|
Ok(n) => n,
|
||||||
|
Err(err) => {
|
||||||
if available == -32 {
|
streams_to_remove.push((stream.id, err.into()));
|
||||||
// buffer underrun
|
i_descriptor += stream.num_descriptors as isize;
|
||||||
stream_inner.buffer_len
|
i_stream += 1;
|
||||||
} else if available < 0 {
|
continue;
|
||||||
check_errors(available as libc::c_int)
|
|
||||||
.expect("buffer is not available");
|
|
||||||
unreachable!()
|
|
||||||
} else {
|
|
||||||
(available * stream_inner.num_channels as alsa::snd_pcm_sframes_t) as
|
|
||||||
usize
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if available_samples < stream_inner.period_len {
|
// Only go on if there is at least `stream.period_len` samples.
|
||||||
i_descriptor += stream_inner.num_descriptors as isize;
|
if available_samples < stream.period_len {
|
||||||
|
i_descriptor += stream.num_descriptors as isize;
|
||||||
i_stream += 1;
|
i_stream += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let stream_id = stream_inner.id.clone();
|
// Prepare the data buffer.
|
||||||
|
let buffer_size = stream.sample_format.sample_size() * available_samples;
|
||||||
let available_frames = available_samples / stream_inner.num_channels as usize;
|
stream.buffer.resize(buffer_size, 0u8);
|
||||||
|
let available_frames = available_samples / stream.num_channels as usize;
|
||||||
let buffer_size = stream_inner.sample_format.sample_size() * available_samples;
|
|
||||||
// Could be written with a match with improved borrow checking
|
|
||||||
if stream_inner.buffer.is_none() {
|
|
||||||
stream_inner.buffer = Some(vec![0u8; buffer_size]);
|
|
||||||
} else {
|
|
||||||
stream_inner.buffer.as_mut().unwrap().resize(buffer_size, 0u8);
|
|
||||||
}
|
|
||||||
let buffer = stream_inner.buffer.as_mut().unwrap();
|
|
||||||
|
|
||||||
match stream_type {
|
match stream_type {
|
||||||
StreamType::Input => {
|
StreamType::Input => {
|
||||||
let err = alsa::snd_pcm_readi(
|
let result = alsa::snd_pcm_readi(
|
||||||
stream_inner.channel,
|
stream.channel,
|
||||||
buffer.as_mut_ptr() as *mut _,
|
stream.buffer.as_mut_ptr() as *mut _,
|
||||||
available_frames as alsa::snd_pcm_uframes_t,
|
available_frames as alsa::snd_pcm_uframes_t,
|
||||||
);
|
);
|
||||||
check_errors(err as _).expect("snd_pcm_readi error");
|
if let Err(err) = check_errors(result as _) {
|
||||||
|
let description = format!("`snd_pcm_readi` failed: {}", err);
|
||||||
|
let err = BackendSpecificError { description };
|
||||||
|
streams_to_remove.push((stream.id, err.into()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let input_buffer = match stream_inner.sample_format {
|
let input_buffer = match stream.sample_format {
|
||||||
SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer {
|
SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer {
|
||||||
buffer: cast_input_buffer(buffer),
|
buffer: cast_input_buffer(&mut stream.buffer),
|
||||||
}),
|
}),
|
||||||
SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer {
|
SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer {
|
||||||
buffer: cast_input_buffer(buffer),
|
buffer: cast_input_buffer(&mut stream.buffer),
|
||||||
}),
|
}),
|
||||||
SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer {
|
SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer {
|
||||||
buffer: cast_input_buffer(buffer),
|
buffer: cast_input_buffer(&mut stream.buffer),
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
let stream_data = StreamData::Input {
|
let stream_data = StreamData::Input {
|
||||||
buffer: input_buffer,
|
buffer: input_buffer,
|
||||||
};
|
};
|
||||||
callback(stream_id, stream_data);
|
callback(stream.id, Ok(stream_data));
|
||||||
},
|
},
|
||||||
StreamType::Output => {
|
StreamType::Output => {
|
||||||
{
|
{
|
||||||
// We're now sure that we're ready to write data.
|
// We're now sure that we're ready to write data.
|
||||||
let output_buffer = match stream_inner.sample_format {
|
let output_buffer = match stream.sample_format {
|
||||||
SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer {
|
SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer {
|
||||||
buffer: cast_output_buffer(buffer),
|
buffer: cast_output_buffer(&mut stream.buffer),
|
||||||
}),
|
}),
|
||||||
SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer {
|
SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer {
|
||||||
buffer: cast_output_buffer(buffer),
|
buffer: cast_output_buffer(&mut stream.buffer),
|
||||||
}),
|
}),
|
||||||
SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer {
|
SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer {
|
||||||
buffer: cast_output_buffer(buffer),
|
buffer: cast_output_buffer(&mut stream.buffer),
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream_data = StreamData::Output {
|
let stream_data = StreamData::Output {
|
||||||
buffer: output_buffer,
|
buffer: output_buffer,
|
||||||
};
|
};
|
||||||
callback(stream_id, stream_data);
|
callback(stream.id, Ok(stream_data));
|
||||||
}
|
}
|
||||||
loop {
|
loop {
|
||||||
let result = alsa::snd_pcm_writei(
|
let result = alsa::snd_pcm_writei(
|
||||||
stream_inner.channel,
|
stream.channel,
|
||||||
buffer.as_ptr() as *const _,
|
stream.buffer.as_ptr() as *const _,
|
||||||
available_frames as alsa::snd_pcm_uframes_t,
|
available_frames as alsa::snd_pcm_uframes_t,
|
||||||
);
|
);
|
||||||
|
|
||||||
if result == -32 {
|
if result == -32 {
|
||||||
// buffer underrun
|
// buffer underrun
|
||||||
alsa::snd_pcm_prepare(stream_inner.channel);
|
// TODO: Notify the user of this.
|
||||||
} else if result < 0 {
|
alsa::snd_pcm_prepare(stream.channel);
|
||||||
check_errors(result as libc::c_int)
|
} else if let Err(err) = check_errors(result as _) {
|
||||||
.expect("could not write pcm");
|
let description = format!("`snd_pcm_writei` failed: {}", err);
|
||||||
|
let err = BackendSpecificError { description };
|
||||||
|
streams_to_remove.push((stream.id, err.into()));
|
||||||
|
continue;
|
||||||
|
} else if result as usize != available_frames {
|
||||||
|
let description = format!(
|
||||||
|
"unexpected number of frames written: expected {}, \
|
||||||
|
result {} (this should never happen)",
|
||||||
|
available_frames,
|
||||||
|
result,
|
||||||
|
);
|
||||||
|
let err = BackendSpecificError { description };
|
||||||
|
streams_to_remove.push((stream.id, err.into()));
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
assert_eq!(result as usize, available_frames);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove any streams that have errored and notify the user.
|
||||||
|
for (stream_id, err) in streams_to_remove {
|
||||||
|
run_context.streams.retain(|s| s.id != stream_id);
|
||||||
|
callback(stream_id, Err(err.into()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
panic!("`cpal::EventLoop::run` API currently disallows returning");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build_input_stream(
|
pub fn build_input_stream(
|
||||||
|
@ -739,7 +710,7 @@ impl EventLoop {
|
||||||
can_pause: can_pause,
|
can_pause: can_pause,
|
||||||
is_paused: false,
|
is_paused: false,
|
||||||
resume_trigger: Trigger::new(),
|
resume_trigger: Trigger::new(),
|
||||||
buffer: None,
|
buffer: vec![],
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) {
|
if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) {
|
||||||
|
@ -818,7 +789,7 @@ impl EventLoop {
|
||||||
can_pause: can_pause,
|
can_pause: can_pause,
|
||||||
is_paused: false,
|
is_paused: false,
|
||||||
resume_trigger: Trigger::new(),
|
resume_trigger: Trigger::new(),
|
||||||
buffer: None,
|
buffer: vec![],
|
||||||
};
|
};
|
||||||
|
|
||||||
self.push_command(Command::NewStream(stream_inner));
|
self.push_command(Command::NewStream(stream_inner));
|
||||||
|
@ -830,7 +801,7 @@ impl EventLoop {
|
||||||
fn push_command(&self, command: Command) {
|
fn push_command(&self, command: Command) {
|
||||||
// Safe to unwrap: sender outlives receiver.
|
// Safe to unwrap: sender outlives receiver.
|
||||||
self.commands.send(command).unwrap();
|
self.commands.send(command).unwrap();
|
||||||
self.pending_trigger.wakeup();
|
self.pending_command_trigger.wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -851,6 +822,150 @@ impl EventLoop {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process any pending `Command`s within the `RunContext`'s queue.
|
||||||
|
fn process_commands(run_context: &mut RunContext) {
|
||||||
|
for command in run_context.commands.try_iter() {
|
||||||
|
match command {
|
||||||
|
Command::DestroyStream(stream_id) => {
|
||||||
|
run_context.streams.retain(|s| s.id != stream_id);
|
||||||
|
},
|
||||||
|
Command::PlayStream(stream_id) => {
|
||||||
|
if let Some(stream) = run_context.streams.iter_mut()
|
||||||
|
.find(|stream| stream.can_pause && stream.id == stream_id)
|
||||||
|
{
|
||||||
|
unsafe {
|
||||||
|
alsa::snd_pcm_pause(stream.channel, 0);
|
||||||
|
}
|
||||||
|
stream.is_paused = false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::PauseStream(stream_id) => {
|
||||||
|
if let Some(stream) = run_context.streams.iter_mut()
|
||||||
|
.find(|stream| stream.can_pause && stream.id == stream_id)
|
||||||
|
{
|
||||||
|
unsafe {
|
||||||
|
alsa::snd_pcm_pause(stream.channel, 1);
|
||||||
|
}
|
||||||
|
stream.is_paused = true;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::NewStream(stream_inner) => {
|
||||||
|
run_context.streams.push(stream_inner);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resets the descriptors so that only `pending_command_trigger.read_fd()` is contained.
|
||||||
|
fn reset_descriptors_with_pending_command_trigger(
|
||||||
|
descriptors: &mut Vec<libc::pollfd>,
|
||||||
|
pending_command_trigger: &Trigger,
|
||||||
|
) {
|
||||||
|
descriptors.clear();
|
||||||
|
descriptors.push(libc::pollfd {
|
||||||
|
fd: pending_command_trigger.read_fd(),
|
||||||
|
events: libc::POLLIN,
|
||||||
|
revents: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Appends the `poll` descriptors for each stream onto the `RunContext`'s descriptor slice, ready
|
||||||
|
// for a call to `libc::poll`.
|
||||||
|
fn append_stream_poll_descriptors(run_context: &mut RunContext) {
|
||||||
|
for stream in run_context.streams.iter() {
|
||||||
|
run_context.descriptors.reserve(stream.num_descriptors);
|
||||||
|
let len = run_context.descriptors.len();
|
||||||
|
let filled = unsafe {
|
||||||
|
alsa::snd_pcm_poll_descriptors(
|
||||||
|
stream.channel,
|
||||||
|
run_context.descriptors.as_mut_ptr().offset(len as isize),
|
||||||
|
stream.num_descriptors as libc::c_uint,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
|
||||||
|
unsafe {
|
||||||
|
run_context.descriptors.set_len(len + stream.num_descriptors);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Poll all descriptors within the given set.
|
||||||
|
//
|
||||||
|
// Returns `Ok(true)` if some event has occurred or `Ok(false)` if no events have
|
||||||
|
// occurred.
|
||||||
|
//
|
||||||
|
// Returns an `Err` if `libc::poll` returns a negative value for some reason.
|
||||||
|
fn poll_all_descriptors(descriptors: &mut [libc::pollfd]) -> Result<bool, BackendSpecificError> {
|
||||||
|
let res = unsafe {
|
||||||
|
// Don't timeout, wait forever.
|
||||||
|
libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1)
|
||||||
|
};
|
||||||
|
if res < 0 {
|
||||||
|
let description = format!("`libc::poll()` failed: {}", res);
|
||||||
|
Err(BackendSpecificError { description })
|
||||||
|
} else if res == 0 {
|
||||||
|
Ok(false)
|
||||||
|
} else {
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether the event is `POLLOUT` or `POLLIN`.
|
||||||
|
//
|
||||||
|
// If so, return the stream type associated with the event.
|
||||||
|
//
|
||||||
|
// Otherwise, returns `Ok(None)`.
|
||||||
|
//
|
||||||
|
// Returns an `Err` if the `snd_pcm_poll_descriptors_revents` call fails.
|
||||||
|
fn check_for_pollout_or_pollin(
|
||||||
|
stream: &StreamInner,
|
||||||
|
stream_descriptor_ptr: *mut libc::pollfd,
|
||||||
|
) -> Result<Option<StreamType>, BackendSpecificError> {
|
||||||
|
let (revent, res) = unsafe {
|
||||||
|
let mut revent = mem::uninitialized();
|
||||||
|
let res = alsa::snd_pcm_poll_descriptors_revents(
|
||||||
|
stream.channel,
|
||||||
|
stream_descriptor_ptr,
|
||||||
|
stream.num_descriptors as libc::c_uint,
|
||||||
|
&mut revent,
|
||||||
|
);
|
||||||
|
(revent, res)
|
||||||
|
};
|
||||||
|
if let Err(desc) = check_errors(res) {
|
||||||
|
let description =
|
||||||
|
format!("`snd_pcm_poll_descriptors_revents` failed: {}",desc);
|
||||||
|
let err = BackendSpecificError { description };
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
if revent as i16 == libc::POLLOUT {
|
||||||
|
Ok(Some(StreamType::Output))
|
||||||
|
} else if revent as i16 == libc::POLLIN {
|
||||||
|
Ok(Some(StreamType::Input))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine the number of samples that are available to read/write.
|
||||||
|
fn get_available_samples(stream: &StreamInner) -> Result<usize, BackendSpecificError> {
|
||||||
|
// TODO: what about snd_pcm_avail_update?
|
||||||
|
let available = unsafe {
|
||||||
|
alsa::snd_pcm_avail(stream.channel)
|
||||||
|
};
|
||||||
|
if available == -32 {
|
||||||
|
// buffer underrun
|
||||||
|
// TODO: Notify the user some how.
|
||||||
|
Ok(stream.buffer_len)
|
||||||
|
} else if let Err(desc) = check_errors(available as libc::c_int) {
|
||||||
|
let description = format!("failed to get available samples: {}", desc);
|
||||||
|
let err = BackendSpecificError { description };
|
||||||
|
Err(err)
|
||||||
|
} else {
|
||||||
|
Ok((available * stream.num_channels as alsa::snd_pcm_sframes_t) as usize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
unsafe fn set_hw_params_from_format(
|
unsafe fn set_hw_params_from_format(
|
||||||
pcm_handle: *mut alsa::snd_pcm_t,
|
pcm_handle: *mut alsa::snd_pcm_t,
|
||||||
hw_params: &HwParams,
|
hw_params: &HwParams,
|
||||||
|
|
|
@ -13,6 +13,7 @@ use SupportedFormatsError;
|
||||||
use SampleFormat;
|
use SampleFormat;
|
||||||
use SampleRate;
|
use SampleRate;
|
||||||
use StreamData;
|
use StreamData;
|
||||||
|
use StreamDataResult;
|
||||||
use SupportedFormat;
|
use SupportedFormat;
|
||||||
use UnknownTypeInputBuffer;
|
use UnknownTypeInputBuffer;
|
||||||
use UnknownTypeOutputBuffer;
|
use UnknownTypeOutputBuffer;
|
||||||
|
@ -319,13 +320,22 @@ pub struct StreamId(usize);
|
||||||
|
|
||||||
pub struct EventLoop {
|
pub struct EventLoop {
|
||||||
// This `Arc` is shared with all the callbacks of coreaudio.
|
// This `Arc` is shared with all the callbacks of coreaudio.
|
||||||
active_callbacks: Arc<ActiveCallbacks>,
|
//
|
||||||
|
// TODO: Eventually, CPAL's API should be changed to allow for submitting a unique callback per
|
||||||
|
// stream to avoid streams blocking one another.
|
||||||
|
user_callback: Arc<Mutex<UserCallback>>,
|
||||||
streams: Mutex<Vec<Option<StreamInner>>>,
|
streams: Mutex<Vec<Option<StreamInner>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ActiveCallbacks {
|
enum UserCallback {
|
||||||
// Whenever the `run()` method is called with a callback, this callback is put in this list.
|
// When `run` is called with a callback, that callback will be stored here.
|
||||||
callbacks: Mutex<Vec<&'static mut (FnMut(StreamId, StreamData) + Send)>>,
|
//
|
||||||
|
// It is essential for the safety of the program that this callback is removed before `run`
|
||||||
|
// returns (not possible with the current CPAL API).
|
||||||
|
Active(&'static mut (FnMut(StreamId, StreamDataResult) + Send)),
|
||||||
|
// A queue of events that have occurred but that have not yet been emitted to the user as we
|
||||||
|
// don't yet have a callback to do so.
|
||||||
|
Inactive,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct StreamInner {
|
struct StreamInner {
|
||||||
|
@ -427,22 +437,22 @@ impl EventLoop {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new() -> EventLoop {
|
pub fn new() -> EventLoop {
|
||||||
EventLoop {
|
EventLoop {
|
||||||
active_callbacks: Arc::new(ActiveCallbacks { callbacks: Mutex::new(Vec::new()) }),
|
user_callback: Arc::new(Mutex::new(UserCallback::Inactive)),
|
||||||
streams: Mutex::new(Vec::new()),
|
streams: Mutex::new(Vec::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn run<F>(&self, mut callback: F) -> !
|
pub fn run<F>(&self, mut callback: F) -> !
|
||||||
where F: FnMut(StreamId, StreamData) + Send
|
where F: FnMut(StreamId, StreamDataResult) + Send
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
let callback: &mut (FnMut(StreamId, StreamData) + Send) = &mut callback;
|
let mut guard = self.user_callback.lock().unwrap();
|
||||||
self.active_callbacks
|
if let UserCallback::Active(_) = *guard {
|
||||||
.callbacks
|
panic!("`EventLoop::run` was called when the event loop was already running");
|
||||||
.lock()
|
}
|
||||||
.unwrap()
|
let callback: &mut (FnMut(StreamId, StreamDataResult) + Send) = &mut callback;
|
||||||
.push(unsafe { mem::transmute(callback) });
|
*guard = UserCallback::Active(unsafe { mem::transmute(callback) });
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -450,8 +460,8 @@ impl EventLoop {
|
||||||
thread::sleep(Duration::new(1u64, 0u32));
|
thread::sleep(Duration::new(1u64, 0u32));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: if we ever change this API so that `run` can return, then it is critical that
|
// It is critical that we remove the callback before returning (currently not possible).
|
||||||
// we remove the callback from `active_callbacks`.
|
// *self.user_callback.lock().unwrap() = UserCallback::Inactive;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_stream_id(&self) -> usize {
|
fn next_stream_id(&self) -> usize {
|
||||||
|
@ -650,7 +660,7 @@ impl EventLoop {
|
||||||
|
|
||||||
// Register the callback that is being called by coreaudio whenever it needs data to be
|
// Register the callback that is being called by coreaudio whenever it needs data to be
|
||||||
// fed to the audio buffer.
|
// fed to the audio buffer.
|
||||||
let active_callbacks = self.active_callbacks.clone();
|
let user_callback = self.user_callback.clone();
|
||||||
let sample_format = format.data_type;
|
let sample_format = format.data_type;
|
||||||
let bytes_per_channel = format.data_type.sample_size();
|
let bytes_per_channel = format.data_type.sample_size();
|
||||||
type Args = render_callback::Args<data::Raw>;
|
type Args = render_callback::Args<data::Raw>;
|
||||||
|
@ -666,20 +676,20 @@ impl EventLoop {
|
||||||
mData: data
|
mData: data
|
||||||
} = buffers[0];
|
} = buffers[0];
|
||||||
|
|
||||||
let mut callbacks = active_callbacks.callbacks.lock().unwrap();
|
let mut user_callback = user_callback.lock().unwrap();
|
||||||
|
|
||||||
// A small macro to simplify handling the callback for different sample types.
|
// A small macro to simplify handling the callback for different sample types.
|
||||||
macro_rules! try_callback {
|
macro_rules! try_callback {
|
||||||
($SampleFormat:ident, $SampleType:ty) => {{
|
($SampleFormat:ident, $SampleType:ty) => {{
|
||||||
let data_len = (data_byte_size as usize / bytes_per_channel) as usize;
|
let data_len = (data_byte_size as usize / bytes_per_channel) as usize;
|
||||||
let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len);
|
let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len);
|
||||||
let callback = match callbacks.get_mut(0) {
|
let callback = match *user_callback {
|
||||||
Some(cb) => cb,
|
UserCallback::Active(ref mut cb) => cb,
|
||||||
None => return Ok(()),
|
UserCallback::Inactive => return Ok(()),
|
||||||
};
|
};
|
||||||
let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice });
|
let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice });
|
||||||
let stream_data = StreamData::Input { buffer: unknown_type_buffer };
|
let stream_data = StreamData::Input { buffer: unknown_type_buffer };
|
||||||
callback(StreamId(stream_id), stream_data);
|
callback(StreamId(stream_id), Ok(stream_data));
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -723,7 +733,7 @@ impl EventLoop {
|
||||||
|
|
||||||
// Register the callback that is being called by coreaudio whenever it needs data to be
|
// Register the callback that is being called by coreaudio whenever it needs data to be
|
||||||
// fed to the audio buffer.
|
// fed to the audio buffer.
|
||||||
let active_callbacks = self.active_callbacks.clone();
|
let user_callback = self.user_callback.clone();
|
||||||
let sample_format = format.data_type;
|
let sample_format = format.data_type;
|
||||||
let bytes_per_channel = format.data_type.sample_size();
|
let bytes_per_channel = format.data_type.sample_size();
|
||||||
type Args = render_callback::Args<data::Raw>;
|
type Args = render_callback::Args<data::Raw>;
|
||||||
|
@ -737,16 +747,16 @@ impl EventLoop {
|
||||||
mData: data
|
mData: data
|
||||||
} = (*args.data.data).mBuffers[0];
|
} = (*args.data.data).mBuffers[0];
|
||||||
|
|
||||||
let mut callbacks = active_callbacks.callbacks.lock().unwrap();
|
let mut user_callback = user_callback.lock().unwrap();
|
||||||
|
|
||||||
// A small macro to simplify handling the callback for different sample types.
|
// A small macro to simplify handling the callback for different sample types.
|
||||||
macro_rules! try_callback {
|
macro_rules! try_callback {
|
||||||
($SampleFormat:ident, $SampleType:ty, $equilibrium:expr) => {{
|
($SampleFormat:ident, $SampleType:ty, $equilibrium:expr) => {{
|
||||||
let data_len = (data_byte_size as usize / bytes_per_channel) as usize;
|
let data_len = (data_byte_size as usize / bytes_per_channel) as usize;
|
||||||
let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len);
|
let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len);
|
||||||
let callback = match callbacks.get_mut(0) {
|
let callback = match *user_callback {
|
||||||
Some(cb) => cb,
|
UserCallback::Active(ref mut cb) => cb,
|
||||||
None => {
|
UserCallback::Inactive => {
|
||||||
for sample in data_slice.iter_mut() {
|
for sample in data_slice.iter_mut() {
|
||||||
*sample = $equilibrium;
|
*sample = $equilibrium;
|
||||||
}
|
}
|
||||||
|
@ -755,7 +765,7 @@ impl EventLoop {
|
||||||
};
|
};
|
||||||
let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice });
|
let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice });
|
||||||
let stream_data = StreamData::Output { buffer: unknown_type_buffer };
|
let stream_data = StreamData::Output { buffer: unknown_type_buffer };
|
||||||
callback(StreamId(stream_id), stream_data);
|
callback(StreamId(stream_id), Ok(stream_data));
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -778,13 +788,15 @@ impl EventLoop {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn destroy_stream(&self, stream_id: StreamId) {
|
pub fn destroy_stream(&self, stream_id: StreamId) {
|
||||||
let mut streams = self.streams.lock().unwrap();
|
{
|
||||||
streams[stream_id.0] = None;
|
let mut streams = self.streams.lock().unwrap();
|
||||||
|
streams[stream_id.0] = None;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn play_stream(&self, stream: StreamId) -> Result<(), PlayStreamError> {
|
pub fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> {
|
||||||
let mut streams = self.streams.lock().unwrap();
|
let mut streams = self.streams.lock().unwrap();
|
||||||
let stream = streams[stream.0].as_mut().unwrap();
|
let stream = streams[stream_id.0].as_mut().unwrap();
|
||||||
|
|
||||||
if !stream.playing {
|
if !stream.playing {
|
||||||
if let Err(e) = stream.audio_unit.start() {
|
if let Err(e) = stream.audio_unit.start() {
|
||||||
|
@ -797,9 +809,9 @@ impl EventLoop {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pause_stream(&self, stream: StreamId) -> Result<(), PauseStreamError> {
|
pub fn pause_stream(&self, stream_id: StreamId) -> Result<(), PauseStreamError> {
|
||||||
let mut streams = self.streams.lock().unwrap();
|
let mut streams = self.streams.lock().unwrap();
|
||||||
let stream = streams[stream.0].as_mut().unwrap();
|
let stream = streams[stream_id.0].as_mut().unwrap();
|
||||||
|
|
||||||
if stream.playing {
|
if stream.playing {
|
||||||
if let Err(e) = stream.audio_unit.stop() {
|
if let Err(e) = stream.audio_unit.stop() {
|
||||||
|
@ -807,6 +819,7 @@ impl EventLoop {
|
||||||
let err = BackendSpecificError { description };
|
let err = BackendSpecificError { description };
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.playing = false;
|
stream.playing = false;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -17,6 +17,7 @@ use PauseStreamError;
|
||||||
use PlayStreamError;
|
use PlayStreamError;
|
||||||
use SupportedFormatsError;
|
use SupportedFormatsError;
|
||||||
use StreamData;
|
use StreamData;
|
||||||
|
use StreamDataResult;
|
||||||
use SupportedFormat;
|
use SupportedFormat;
|
||||||
use UnknownTypeOutputBuffer;
|
use UnknownTypeOutputBuffer;
|
||||||
|
|
||||||
|
@ -37,13 +38,14 @@ impl EventLoop {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new() -> EventLoop {
|
pub fn new() -> EventLoop {
|
||||||
stdweb::initialize();
|
stdweb::initialize();
|
||||||
|
EventLoop {
|
||||||
EventLoop { streams: Mutex::new(Vec::new()) }
|
streams: Mutex::new(Vec::new()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn run<F>(&self, callback: F) -> !
|
pub fn run<F>(&self, callback: F) -> !
|
||||||
where F: FnMut(StreamId, StreamData)
|
where F: FnMut(StreamId, StreamDataResult),
|
||||||
{
|
{
|
||||||
// The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job
|
// The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job
|
||||||
// of this callback is to fill the content of the audio buffers.
|
// of this callback is to fill the content of the audio buffers.
|
||||||
|
@ -52,7 +54,7 @@ impl EventLoop {
|
||||||
// and to the `callback` parameter that was passed to `run`.
|
// and to the `callback` parameter that was passed to `run`.
|
||||||
|
|
||||||
fn callback_fn<F>(user_data_ptr: *mut c_void)
|
fn callback_fn<F>(user_data_ptr: *mut c_void)
|
||||||
where F: FnMut(StreamId, StreamData)
|
where F: FnMut(StreamId, StreamDataResult)
|
||||||
{
|
{
|
||||||
unsafe {
|
unsafe {
|
||||||
let user_data_ptr2 = user_data_ptr as *mut (&EventLoop, F);
|
let user_data_ptr2 = user_data_ptr as *mut (&EventLoop, F);
|
||||||
|
@ -71,16 +73,16 @@ impl EventLoop {
|
||||||
{
|
{
|
||||||
let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { buffer: &mut temporary_buffer });
|
let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { buffer: &mut temporary_buffer });
|
||||||
let data = StreamData::Output { buffer: buffer };
|
let data = StreamData::Output { buffer: buffer };
|
||||||
user_cb(StreamId(stream_id), data);
|
user_cb(StreamId(stream_id), Ok(data));
|
||||||
// TODO: directly use a TypedArray<f32> once this is supported by stdweb
|
// TODO: directly use a TypedArray<f32> once this is supported by stdweb
|
||||||
}
|
}
|
||||||
|
|
||||||
let typed_array = {
|
let typed_array = {
|
||||||
let f32_slice = temporary_buffer.as_slice();
|
let f32_slice = temporary_buffer.as_slice();
|
||||||
let u8_slice: &[u8] = unsafe {
|
let u8_slice: &[u8] = from_raw_parts(
|
||||||
from_raw_parts(f32_slice.as_ptr() as *const _,
|
f32_slice.as_ptr() as *const _,
|
||||||
f32_slice.len() * mem::size_of::<f32>())
|
f32_slice.len() * mem::size_of::<f32>(),
|
||||||
};
|
);
|
||||||
let typed_array: TypedArray<u8> = u8_slice.into();
|
let typed_array: TypedArray<u8> = u8_slice.into();
|
||||||
typed_array
|
typed_array
|
||||||
};
|
};
|
||||||
|
|
44
src/lib.rs
44
src/lib.rs
|
@ -70,8 +70,8 @@
|
||||||
//!
|
//!
|
||||||
//! ```no_run
|
//! ```no_run
|
||||||
//! # let event_loop = cpal::EventLoop::new();
|
//! # let event_loop = cpal::EventLoop::new();
|
||||||
//! event_loop.run(move |_stream_id, _stream_data| {
|
//! event_loop.run(move |_stream_id, _stream_result| {
|
||||||
//! // read or write stream data here
|
//! // react to stream events and read or write stream data here
|
||||||
//! });
|
//! });
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
|
@ -90,7 +90,16 @@
|
||||||
//! use cpal::{StreamData, UnknownTypeOutputBuffer};
|
//! use cpal::{StreamData, UnknownTypeOutputBuffer};
|
||||||
//!
|
//!
|
||||||
//! # let event_loop = cpal::EventLoop::new();
|
//! # let event_loop = cpal::EventLoop::new();
|
||||||
//! event_loop.run(move |_stream_id, mut stream_data| {
|
//! event_loop.run(move |stream_id, stream_result| {
|
||||||
|
//! let stream_data = match stream_result {
|
||||||
|
//! Ok(data) => data,
|
||||||
|
//! Err(err) => {
|
||||||
|
//! eprintln!("an error occurred on stream {:?}: {}", stream_id, err);
|
||||||
|
//! return;
|
||||||
|
//! }
|
||||||
|
//! _ => return,
|
||||||
|
//! };
|
||||||
|
//!
|
||||||
//! match stream_data {
|
//! match stream_data {
|
||||||
//! StreamData::Output { buffer: UnknownTypeOutputBuffer::U16(mut buffer) } => {
|
//! StreamData::Output { buffer: UnknownTypeOutputBuffer::U16(mut buffer) } => {
|
||||||
//! for elem in buffer.iter_mut() {
|
//! for elem in buffer.iter_mut() {
|
||||||
|
@ -206,6 +215,10 @@ pub enum StreamData<'a> {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stream data passed to the `EventLoop::run` callback, or an error in the case that the device
|
||||||
|
/// was invalidated or some backend-specific error occurred.
|
||||||
|
pub type StreamDataResult<'a> = Result<StreamData<'a>, StreamError>;
|
||||||
|
|
||||||
/// Represents a buffer containing audio data that may be read.
|
/// Represents a buffer containing audio data that may be read.
|
||||||
///
|
///
|
||||||
/// This struct implements the `Deref` trait targeting `[T]`. Therefore this buffer can be read the
|
/// This struct implements the `Deref` trait targeting `[T]`. Therefore this buffer can be read the
|
||||||
|
@ -291,7 +304,7 @@ pub struct SupportedOutputFormats(cpal_impl::SupportedOutputFormats);
|
||||||
/// **Note:** If you notice a `BackendSpecificError` that you believe could be better handled in a
|
/// **Note:** If you notice a `BackendSpecificError` that you believe could be better handled in a
|
||||||
/// cross-platform manner, please create an issue or submit a pull request with a patch that adds
|
/// cross-platform manner, please create an issue or submit a pull request with a patch that adds
|
||||||
/// the necessary error variant to the appropriate error enum.
|
/// the necessary error variant to the appropriate error enum.
|
||||||
#[derive(Debug, Fail)]
|
#[derive(Clone, Debug, Fail)]
|
||||||
#[fail(display = "A backend-specific error has occurred: {}", description)]
|
#[fail(display = "A backend-specific error has occurred: {}", description)]
|
||||||
pub struct BackendSpecificError {
|
pub struct BackendSpecificError {
|
||||||
pub description: String
|
pub description: String
|
||||||
|
@ -412,6 +425,21 @@ pub enum PauseStreamError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Errors that might occur while a stream is running.
|
||||||
|
#[derive(Debug, Fail)]
|
||||||
|
pub enum StreamError {
|
||||||
|
/// The device no longer exists. This can happen if the device is disconnected while the
|
||||||
|
/// program is running.
|
||||||
|
#[fail(display = "The requested device is no longer available. For example, it has been unplugged.")]
|
||||||
|
DeviceNotAvailable,
|
||||||
|
/// See the `BackendSpecificError` docs for more information about this error variant.
|
||||||
|
#[fail(display = "{}", err)]
|
||||||
|
BackendSpecific {
|
||||||
|
#[fail(cause)]
|
||||||
|
err: BackendSpecificError,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// An iterator yielding all `Device`s currently available to the system.
|
/// An iterator yielding all `Device`s currently available to the system.
|
||||||
///
|
///
|
||||||
/// Can be empty if the system does not support audio in general.
|
/// Can be empty if the system does not support audio in general.
|
||||||
|
@ -591,7 +619,7 @@ impl EventLoop {
|
||||||
/// You can call the other methods of `EventLoop` without getting a deadlock.
|
/// You can call the other methods of `EventLoop` without getting a deadlock.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn run<F>(&self, mut callback: F) -> !
|
pub fn run<F>(&self, mut callback: F) -> !
|
||||||
where F: FnMut(StreamId, StreamData) + Send
|
where F: FnMut(StreamId, StreamDataResult) + Send
|
||||||
{
|
{
|
||||||
self.0.run(move |id, data| callback(StreamId(id), data))
|
self.0.run(move |id, data| callback(StreamId(id), data))
|
||||||
}
|
}
|
||||||
|
@ -831,6 +859,12 @@ impl From<BackendSpecificError> for PauseStreamError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<BackendSpecificError> for StreamError {
|
||||||
|
fn from(err: BackendSpecificError) -> Self {
|
||||||
|
StreamError::BackendSpecific { err }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If a backend does not provide an API for retrieving supported formats, we query it with a bunch
|
// If a backend does not provide an API for retrieving supported formats, we query it with a bunch
|
||||||
// of commonly used rates. This is always the case for wasapi and is sometimes the case for alsa.
|
// of commonly used rates. This is always the case for wasapi and is sometimes the case for alsa.
|
||||||
//
|
//
|
||||||
|
|
|
@ -9,8 +9,8 @@ use DeviceNameError;
|
||||||
use Format;
|
use Format;
|
||||||
use PauseStreamError;
|
use PauseStreamError;
|
||||||
use PlayStreamError;
|
use PlayStreamError;
|
||||||
|
use StreamDataResult;
|
||||||
use SupportedFormatsError;
|
use SupportedFormatsError;
|
||||||
use StreamData;
|
|
||||||
use SupportedFormat;
|
use SupportedFormat;
|
||||||
|
|
||||||
pub struct EventLoop;
|
pub struct EventLoop;
|
||||||
|
@ -23,7 +23,7 @@ impl EventLoop {
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn run<F>(&self, _callback: F) -> !
|
pub fn run<F>(&self, _callback: F) -> !
|
||||||
where F: FnMut(StreamId, StreamData)
|
where F: FnMut(StreamId, StreamDataResult)
|
||||||
{
|
{
|
||||||
loop { /* TODO: don't spin */ }
|
loop { /* TODO: don't spin */ }
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ use PauseStreamError;
|
||||||
use PlayStreamError;
|
use PlayStreamError;
|
||||||
use SampleFormat;
|
use SampleFormat;
|
||||||
use StreamData;
|
use StreamData;
|
||||||
|
use StreamDataResult;
|
||||||
|
use StreamError;
|
||||||
use UnknownTypeOutputBuffer;
|
use UnknownTypeOutputBuffer;
|
||||||
use UnknownTypeInputBuffer;
|
use UnknownTypeInputBuffer;
|
||||||
|
|
||||||
|
@ -443,12 +445,12 @@ impl EventLoop {
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn run<F>(&self, mut callback: F) -> !
|
pub fn run<F>(&self, mut callback: F) -> !
|
||||||
where F: FnMut(StreamId, StreamData)
|
where F: FnMut(StreamId, StreamDataResult)
|
||||||
{
|
{
|
||||||
self.run_inner(&mut callback);
|
self.run_inner(&mut callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_inner(&self, callback: &mut FnMut(StreamId, StreamData)) -> ! {
|
fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! {
|
||||||
unsafe {
|
unsafe {
|
||||||
// We keep `run_context` locked forever, which guarantees that two invocations of
|
// We keep `run_context` locked forever, which guarantees that two invocations of
|
||||||
// `run()` cannot run simultaneously.
|
// `run()` cannot run simultaneously.
|
||||||
|
@ -457,190 +459,161 @@ impl EventLoop {
|
||||||
// Shadow the name because we don't use (or drop) it otherwise.
|
// Shadow the name because we don't use (or drop) it otherwise.
|
||||||
let run_context = &mut *run_context;
|
let run_context = &mut *run_context;
|
||||||
|
|
||||||
loop {
|
// Keep track of the set of streams that should be removed due to some error occurring.
|
||||||
// Process the pending commands.
|
//
|
||||||
for command in run_context.commands.try_iter() {
|
// Checked at the start of each loop.
|
||||||
match command {
|
let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![];
|
||||||
Command::NewStream(stream_inner) => {
|
|
||||||
let event = stream_inner.event;
|
'stream_loop: loop {
|
||||||
run_context.streams.push(stream_inner);
|
// Remove any failed streams.
|
||||||
run_context.handles.push(event);
|
for (stream_id, err) in streams_to_remove.drain(..) {
|
||||||
},
|
match run_context.streams.iter().position(|s| s.id == stream_id) {
|
||||||
Command::DestroyStream(stream_id) => {
|
None => continue,
|
||||||
match run_context.streams.iter().position(|v| v.id == stream_id) {
|
Some(p) => {
|
||||||
None => continue,
|
run_context.handles.remove(p + 1);
|
||||||
Some(p) => {
|
run_context.streams.remove(p);
|
||||||
run_context.handles.remove(p + 1);
|
callback(stream_id, Err(err.into()));
|
||||||
run_context.streams.remove(p);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::PlayStream(stream_id) => {
|
|
||||||
match run_context.streams.iter_mut().find(|v| v.id == stream_id) {
|
|
||||||
None => continue,
|
|
||||||
Some(stream) => {
|
|
||||||
if !stream.playing {
|
|
||||||
let hresult = (*stream.audio_client).Start();
|
|
||||||
check_result(hresult).unwrap();
|
|
||||||
stream.playing = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::PauseStream(stream_id) => {
|
|
||||||
match run_context.streams.iter_mut().find(|v| v.id == stream_id) {
|
|
||||||
None => continue,
|
|
||||||
Some(stream) => {
|
|
||||||
if stream.playing {
|
|
||||||
let hresult = (*stream.audio_client).Stop();
|
|
||||||
check_result(hresult).unwrap();
|
|
||||||
stream.playing = false;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for any of the handles to be signalled, which means that the corresponding
|
// Process queued commands.
|
||||||
// sound needs a buffer.
|
process_commands(run_context, callback);
|
||||||
debug_assert!(run_context.handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize);
|
|
||||||
let result = synchapi::WaitForMultipleObjectsEx(run_context.handles.len() as u32,
|
|
||||||
run_context.handles.as_ptr(),
|
|
||||||
FALSE,
|
|
||||||
winbase::INFINITE, /* TODO: allow setting a timeout */
|
|
||||||
FALSE /* irrelevant parameter here */);
|
|
||||||
|
|
||||||
// Notifying the corresponding task handler.
|
// Wait for any of the handles to be signalled.
|
||||||
debug_assert!(result >= winbase::WAIT_OBJECT_0);
|
let handle_idx = match wait_for_handle_signal(&run_context.handles) {
|
||||||
let handle_id = (result - winbase::WAIT_OBJECT_0) as usize;
|
Ok(idx) => idx,
|
||||||
|
Err(err) => {
|
||||||
|
for stream in &run_context.streams {
|
||||||
|
callback(stream.id.clone(), Err(err.clone().into()));
|
||||||
|
}
|
||||||
|
run_context.streams.clear();
|
||||||
|
run_context.handles.truncate(1);
|
||||||
|
break 'stream_loop;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// If `handle_id` is 0, then it's `pending_scheduled_event` that was signalled in
|
// If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in
|
||||||
// order for us to pick up the pending commands.
|
// order for us to pick up the pending commands. Otherwise, a stream needs data.
|
||||||
// Otherwise, a stream needs data.
|
if handle_idx == 0 {
|
||||||
if handle_id >= 1 {
|
continue;
|
||||||
let stream = &mut run_context.streams[handle_id - 1];
|
}
|
||||||
let stream_id = stream.id.clone();
|
|
||||||
|
|
||||||
// Obtaining the number of frames that are available to be written.
|
let stream_idx = handle_idx - 1;
|
||||||
let mut frames_available = {
|
let stream = &mut run_context.streams[stream_idx];
|
||||||
let mut padding = mem::uninitialized();
|
|
||||||
let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding);
|
|
||||||
// Happens when a bluetooth headset was turned off, for example.
|
|
||||||
if hresult == AUDCLNT_E_DEVICE_INVALIDATED {
|
|
||||||
// The client code should switch to a different device eventually.
|
|
||||||
// For now let's just skip the invalidated device.
|
|
||||||
// Would be nice to inform the client code about the invalidation,
|
|
||||||
// but throwing a panic isn't the most ergonomic way to do so.
|
|
||||||
continue}
|
|
||||||
check_result(hresult).unwrap();
|
|
||||||
stream.max_frames_in_buffer - padding
|
|
||||||
};
|
|
||||||
|
|
||||||
if frames_available == 0 {
|
// The number of frames available for reading/writing.
|
||||||
// TODO: can this happen?
|
let mut frames_available = match get_available_frames(stream) {
|
||||||
|
Ok(0) => continue, // TODO: Can this happen?
|
||||||
|
Ok(n) => n,
|
||||||
|
Err(err) => {
|
||||||
|
streams_to_remove.push((stream.id.clone(), err));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let sample_size = stream.sample_format.sample_size();
|
let sample_size = stream.sample_format.sample_size();
|
||||||
|
|
||||||
// Obtaining a pointer to the buffer.
|
// Obtaining a pointer to the buffer.
|
||||||
match stream.client_flow {
|
match stream.client_flow {
|
||||||
|
|
||||||
AudioClientFlow::Capture { capture_client } => {
|
AudioClientFlow::Capture { capture_client } => {
|
||||||
// Get the available data in the shared buffer.
|
// Get the available data in the shared buffer.
|
||||||
let mut buffer: *mut BYTE = mem::uninitialized();
|
let mut buffer: *mut BYTE = mem::uninitialized();
|
||||||
let mut flags = mem::uninitialized();
|
let mut flags = mem::uninitialized();
|
||||||
let hresult = (*capture_client).GetBuffer(
|
let hresult = (*capture_client).GetBuffer(
|
||||||
&mut buffer,
|
&mut buffer,
|
||||||
&mut frames_available,
|
&mut frames_available,
|
||||||
&mut flags,
|
&mut flags,
|
||||||
ptr::null_mut(),
|
ptr::null_mut(),
|
||||||
ptr::null_mut(),
|
ptr::null_mut(),
|
||||||
);
|
);
|
||||||
check_result(hresult).unwrap();
|
|
||||||
|
|
||||||
if hresult == AUDCLNT_S_BUFFER_EMPTY { continue; }
|
// TODO: Can this happen?
|
||||||
|
if hresult == AUDCLNT_S_BUFFER_EMPTY {
|
||||||
|
continue;
|
||||||
|
} else if let Err(err) = stream_error_from_hresult(hresult) {
|
||||||
|
streams_to_remove.push((stream.id.clone(), err));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
debug_assert!(!buffer.is_null());
|
debug_assert!(!buffer.is_null());
|
||||||
let buffer_len = frames_available as usize
|
|
||||||
* stream.bytes_per_frame as usize / sample_size;
|
|
||||||
|
|
||||||
// Simplify the capture callback sample format branches.
|
let buffer_len = frames_available as usize
|
||||||
macro_rules! capture_callback {
|
* stream.bytes_per_frame as usize / sample_size;
|
||||||
($T:ty, $Variant:ident) => {{
|
|
||||||
let buffer_data = buffer as *mut _ as *const $T;
|
|
||||||
let slice = slice::from_raw_parts(buffer_data, buffer_len);
|
|
||||||
let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer {
|
|
||||||
buffer: slice,
|
|
||||||
});
|
|
||||||
let data = StreamData::Input { buffer: unknown_buffer };
|
|
||||||
callback(stream_id, data);
|
|
||||||
|
|
||||||
// Release the buffer.
|
// Simplify the capture callback sample format branches.
|
||||||
let hresult = (*capture_client).ReleaseBuffer(frames_available);
|
macro_rules! capture_callback {
|
||||||
match check_result(hresult) {
|
($T:ty, $Variant:ident) => {{
|
||||||
// Ignoring unavailable device error.
|
let buffer_data = buffer as *mut _ as *const $T;
|
||||||
Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => {
|
let slice = slice::from_raw_parts(buffer_data, buffer_len);
|
||||||
},
|
let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer {
|
||||||
e => e.unwrap(),
|
buffer: slice,
|
||||||
};
|
});
|
||||||
}};
|
let data = StreamData::Input { buffer: unknown_buffer };
|
||||||
}
|
callback(stream.id.clone(), Ok(data));
|
||||||
|
// Release the buffer.
|
||||||
|
let hresult = (*capture_client).ReleaseBuffer(frames_available);
|
||||||
|
if let Err(err) = stream_error_from_hresult(hresult) {
|
||||||
|
streams_to_remove.push((stream.id.clone(), err));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
match stream.sample_format {
|
match stream.sample_format {
|
||||||
SampleFormat::F32 => capture_callback!(f32, F32),
|
SampleFormat::F32 => capture_callback!(f32, F32),
|
||||||
SampleFormat::I16 => capture_callback!(i16, I16),
|
SampleFormat::I16 => capture_callback!(i16, I16),
|
||||||
SampleFormat::U16 => capture_callback!(u16, U16),
|
SampleFormat::U16 => capture_callback!(u16, U16),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
AudioClientFlow::Render { render_client } => {
|
AudioClientFlow::Render { render_client } => {
|
||||||
let mut buffer: *mut BYTE = mem::uninitialized();
|
let mut buffer: *mut BYTE = mem::uninitialized();
|
||||||
let hresult = (*render_client).GetBuffer(
|
let hresult = (*render_client).GetBuffer(
|
||||||
frames_available,
|
frames_available,
|
||||||
&mut buffer as *mut *mut _,
|
&mut buffer as *mut *mut _,
|
||||||
);
|
);
|
||||||
// FIXME: can return `AUDCLNT_E_DEVICE_INVALIDATED`
|
|
||||||
check_result(hresult).unwrap();
|
|
||||||
debug_assert!(!buffer.is_null());
|
|
||||||
let buffer_len = frames_available as usize
|
|
||||||
* stream.bytes_per_frame as usize / sample_size;
|
|
||||||
|
|
||||||
// Simplify the render callback sample format branches.
|
if let Err(err) = stream_error_from_hresult(hresult) {
|
||||||
macro_rules! render_callback {
|
streams_to_remove.push((stream.id.clone(), err));
|
||||||
($T:ty, $Variant:ident) => {{
|
continue;
|
||||||
let buffer_data = buffer as *mut $T;
|
}
|
||||||
let slice = slice::from_raw_parts_mut(buffer_data, buffer_len);
|
|
||||||
let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer {
|
|
||||||
buffer: slice
|
|
||||||
});
|
|
||||||
let data = StreamData::Output { buffer: unknown_buffer };
|
|
||||||
callback(stream_id, data);
|
|
||||||
let hresult = match stream.client_flow {
|
|
||||||
AudioClientFlow::Render { render_client } => {
|
|
||||||
(*render_client).ReleaseBuffer(frames_available as u32, 0)
|
|
||||||
},
|
|
||||||
_ => unreachable!(),
|
|
||||||
};
|
|
||||||
match check_result(hresult) {
|
|
||||||
// Ignoring the error that is produced if the device has been disconnected.
|
|
||||||
Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => (),
|
|
||||||
e => e.unwrap(),
|
|
||||||
};
|
|
||||||
}}
|
|
||||||
}
|
|
||||||
|
|
||||||
match stream.sample_format {
|
debug_assert!(!buffer.is_null());
|
||||||
SampleFormat::F32 => render_callback!(f32, F32),
|
let buffer_len = frames_available as usize
|
||||||
SampleFormat::I16 => render_callback!(i16, I16),
|
* stream.bytes_per_frame as usize / sample_size;
|
||||||
SampleFormat::U16 => render_callback!(u16, U16),
|
|
||||||
}
|
// Simplify the render callback sample format branches.
|
||||||
},
|
macro_rules! render_callback {
|
||||||
}
|
($T:ty, $Variant:ident) => {{
|
||||||
|
let buffer_data = buffer as *mut $T;
|
||||||
|
let slice = slice::from_raw_parts_mut(buffer_data, buffer_len);
|
||||||
|
let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer {
|
||||||
|
buffer: slice
|
||||||
|
});
|
||||||
|
let data = StreamData::Output { buffer: unknown_buffer };
|
||||||
|
callback(stream.id.clone(), Ok(data));
|
||||||
|
let hresult = (*render_client)
|
||||||
|
.ReleaseBuffer(frames_available as u32, 0);
|
||||||
|
if let Err(err) = stream_error_from_hresult(hresult) {
|
||||||
|
streams_to_remove.push((stream.id.clone(), err));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
|
match stream.sample_format {
|
||||||
|
SampleFormat::F32 => render_callback!(f32, F32),
|
||||||
|
SampleFormat::I16 => render_callback!(i16, I16),
|
||||||
|
SampleFormat::U16 => render_callback!(u16, U16),
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
panic!("`cpal::EventLoop::run` API currently disallows returning");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -758,3 +731,129 @@ fn format_to_waveformatextensible(format: &Format) -> Option<mmreg::WAVEFORMATEX
|
||||||
|
|
||||||
Some(waveformatextensible)
|
Some(waveformatextensible)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process any pending commands that are queued within the `RunContext`.
|
||||||
|
fn process_commands(
|
||||||
|
run_context: &mut RunContext,
|
||||||
|
callback: &mut dyn FnMut(StreamId, StreamDataResult),
|
||||||
|
) {
|
||||||
|
// Process the pending commands.
|
||||||
|
for command in run_context.commands.try_iter() {
|
||||||
|
match command {
|
||||||
|
Command::NewStream(stream_inner) => {
|
||||||
|
let event = stream_inner.event;
|
||||||
|
run_context.streams.push(stream_inner);
|
||||||
|
run_context.handles.push(event);
|
||||||
|
},
|
||||||
|
Command::DestroyStream(stream_id) => {
|
||||||
|
match run_context.streams.iter().position(|s| s.id == stream_id) {
|
||||||
|
None => continue,
|
||||||
|
Some(p) => {
|
||||||
|
run_context.handles.remove(p + 1);
|
||||||
|
run_context.streams.remove(p);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::PlayStream(stream_id) => {
|
||||||
|
match run_context.streams.iter().position(|s| s.id == stream_id) {
|
||||||
|
None => continue,
|
||||||
|
Some(p) => {
|
||||||
|
if !run_context.streams[p].playing {
|
||||||
|
let hresult = unsafe {
|
||||||
|
(*run_context.streams[p].audio_client).Start()
|
||||||
|
};
|
||||||
|
match stream_error_from_hresult(hresult) {
|
||||||
|
Ok(()) => {
|
||||||
|
run_context.streams[p].playing = true;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
callback(stream_id, Err(err.into()));
|
||||||
|
run_context.handles.remove(p + 1);
|
||||||
|
run_context.streams.remove(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::PauseStream(stream_id) => {
|
||||||
|
match run_context.streams.iter().position(|s| s.id == stream_id) {
|
||||||
|
None => continue,
|
||||||
|
Some(p) => {
|
||||||
|
if run_context.streams[p].playing {
|
||||||
|
let hresult = unsafe {
|
||||||
|
(*run_context.streams[p].audio_client).Stop()
|
||||||
|
};
|
||||||
|
match stream_error_from_hresult(hresult) {
|
||||||
|
Ok(()) => {
|
||||||
|
run_context.streams[p].playing = false;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
callback(stream_id, Err(err.into()));
|
||||||
|
run_context.handles.remove(p + 1);
|
||||||
|
run_context.streams.remove(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for any of the given handles to be signalled.
|
||||||
|
//
|
||||||
|
// Returns the index of the `handle` that was signalled, or an `Err` if
|
||||||
|
// `WaitForMultipleObjectsEx` fails.
|
||||||
|
//
|
||||||
|
// This is called when the `run` thread is ready to wait for the next event. The
|
||||||
|
// next event might be some command submitted by the user (the first handle) or
|
||||||
|
// might indicate that one of the streams is ready to deliver or receive audio.
|
||||||
|
fn wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result<usize, BackendSpecificError> {
|
||||||
|
debug_assert!(handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize);
|
||||||
|
let result = unsafe {
|
||||||
|
synchapi::WaitForMultipleObjectsEx(
|
||||||
|
handles.len() as u32,
|
||||||
|
handles.as_ptr(),
|
||||||
|
FALSE, // Don't wait for all, just wait for the first
|
||||||
|
winbase::INFINITE, // TODO: allow setting a timeout
|
||||||
|
FALSE, // irrelevant parameter here
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if result == winbase::WAIT_FAILED {
|
||||||
|
let err = unsafe {
|
||||||
|
winapi::um::errhandlingapi::GetLastError()
|
||||||
|
};
|
||||||
|
let description = format!("`WaitForMultipleObjectsEx failed: {}", err);
|
||||||
|
let err = BackendSpecificError { description };
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
// Notifying the corresponding task handler.
|
||||||
|
debug_assert!(result >= winbase::WAIT_OBJECT_0);
|
||||||
|
let handle_idx = (result - winbase::WAIT_OBJECT_0) as usize;
|
||||||
|
Ok(handle_idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the number of available frames that are available for writing/reading.
|
||||||
|
fn get_available_frames(stream: &StreamInner) -> Result<u32, StreamError> {
|
||||||
|
unsafe {
|
||||||
|
let mut padding = mem::uninitialized();
|
||||||
|
let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding);
|
||||||
|
stream_error_from_hresult(hresult)?;
|
||||||
|
Ok(stream.max_frames_in_buffer - padding)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert the given `HRESULT` into a `StreamError` if it does indicate an error.
|
||||||
|
fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> {
|
||||||
|
if hresult == AUDCLNT_E_DEVICE_INVALIDATED {
|
||||||
|
return Err(StreamError::DeviceNotAvailable);
|
||||||
|
}
|
||||||
|
if let Err(err) = check_result(hresult) {
|
||||||
|
let description = format!("{}", err);
|
||||||
|
let err = BackendSpecificError { description };
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue