From c97d1dd3fa1b9e162cc186cbdc9de43de832dea1 Mon Sep 17 00:00:00 2001 From: Tatsuyuki Ishi Date: Tue, 9 Jul 2019 15:47:33 +0900 Subject: [PATCH] Remove EventLoop and port the ALSA backend --- examples/beep.rs | 26 +- examples/feedback.rs | 128 +++--- examples/record_wav.rs | 101 +++-- src/host/alsa/mod.rs | 897 ++++++++++++++--------------------------- src/host/null/mod.rs | 58 +-- src/lib.rs | 5 +- src/platform/mod.rs | 160 ++------ src/traits.rs | 95 +---- 8 files changed, 519 insertions(+), 951 deletions(-) diff --git a/examples/beep.rs b/examples/beep.rs index 17c2bf9..d2c570c 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -1,37 +1,34 @@ extern crate anyhow; extern crate cpal; -use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait}; +use cpal::traits::{DeviceTrait, StreamTrait, HostTrait}; fn main() -> Result<(), anyhow::Error> { let host = cpal::default_host(); let device = host.default_output_device().expect("failed to find a default output device"); let format = device.default_output_format()?; - let event_loop = host.event_loop(); - let stream_id = event_loop.build_output_stream(&device, &format)?; - event_loop.play_stream(stream_id.clone())?; - let sample_rate = format.sample_rate.0 as f32; + let channels = format.channels; let mut sample_clock = 0f32; // Produce a sinusoid of maximum amplitude. - let mut next_value = || { + let mut next_value = move || { sample_clock = (sample_clock + 1.0) % sample_rate; (sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin() }; - event_loop.run(move |id, result| { + let stream = device.build_output_stream(&format, move |result| { let data = match result { Ok(data) => data, Err(err) => { - eprintln!("an error occurred on stream {:?}: {}", id, err); + eprintln!("an error occurred on stream: {}", err); return; } }; match data { cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => { - for sample in buffer.chunks_mut(format.channels as usize) { + for sample in buffer.chunks_mut(channels as usize) { let value = ((next_value() * 0.5 + 0.5) * std::u16::MAX as f32) as u16; for out in sample.iter_mut() { *out = value; @@ -39,7 +36,7 @@ fn main() -> Result<(), anyhow::Error> { } }, cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::I16(mut buffer) } => { - for sample in buffer.chunks_mut(format.channels as usize) { + for sample in buffer.chunks_mut(channels as usize) { let value = (next_value() * std::i16::MAX as f32) as i16; for out in sample.iter_mut() { *out = value; @@ -47,7 +44,7 @@ fn main() -> Result<(), anyhow::Error> { } }, cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer) } => { - for sample in buffer.chunks_mut(format.channels as usize) { + for sample in buffer.chunks_mut(channels as usize) { let value = next_value(); for out in sample.iter_mut() { *out = value; @@ -56,5 +53,10 @@ fn main() -> Result<(), anyhow::Error> { }, _ => (), } - }); + })?; + stream.play()?; + + std::thread::sleep(std::time::Duration::from_millis(1000)); + + Ok(()) } diff --git a/examples/feedback.rs b/examples/feedback.rs index 682bf37..11c1331 100644 --- a/examples/feedback.rs +++ b/examples/feedback.rs @@ -10,18 +10,21 @@ extern crate anyhow; extern crate cpal; extern crate ringbuf; -use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use ringbuf::RingBuffer; const LATENCY_MS: f32 = 150.0; fn main() -> Result<(), anyhow::Error> { let host = cpal::default_host(); - let event_loop = host.event_loop(); // Default devices. - let input_device = host.default_input_device().expect("failed to get default input device"); - let output_device = host.default_output_device().expect("failed to get default output device"); + let input_device = host + .default_input_device() + .expect("failed to get default input device"); + let output_device = host + .default_output_device() + .expect("failed to get default output device"); println!("Using default input device: \"{}\"", input_device.name()?); println!("Using default output device: \"{}\"", output_device.name()?); @@ -29,12 +32,6 @@ fn main() -> Result<(), anyhow::Error> { let mut format = input_device.default_input_format()?; format.data_type = cpal::SampleFormat::F32; - // Build streams. - println!("Attempting to build both streams with `{:?}`.", format); - let input_stream_id = event_loop.build_input_stream(&input_device, &format)?; - let output_stream_id = event_loop.build_output_stream(&output_device, &format)?; - println!("Successfully built streams."); - // Create a delay in case the input and output devices aren't synced. let latency_frames = (LATENCY_MS / 1_000.0) * format.sample_rate.0 as f32; let latency_samples = latency_frames as usize * format.channels as usize; @@ -50,59 +47,78 @@ fn main() -> Result<(), anyhow::Error> { producer.push(0.0).unwrap(); } - // Play the streams. - println!("Starting the input and output streams with `{}` milliseconds of latency.", LATENCY_MS); - event_loop.play_stream(input_stream_id.clone())?; - event_loop.play_stream(output_stream_id.clone())?; + // Build streams. + println!("Attempting to build both streams with `{:?}`.", format); + let input_stream = input_device.build_input_stream(&format, move |result| { + let data = match result { + Ok(data) => data, + Err(err) => { + eprintln!("an error occurred on input stream: {}", err); + return; + }, + }; - // Run the event loop on a separate thread. - std::thread::spawn(move || { - event_loop.run(move |id, result| { - let data = match result { - Ok(data) => data, - Err(err) => { - eprintln!("an error occurred on stream {:?}: {}", id, err); - return; + match data { + cpal::StreamData::Input { + buffer: cpal::UnknownTypeInputBuffer::F32(buffer), + } => { + let mut output_fell_behind = false; + for &sample in buffer.iter() { + if producer.push(sample).is_err() { + output_fell_behind = true; + } } - }; + if output_fell_behind { + eprintln!("output stream fell behind: try increasing latency"); + } + }, + _ => panic!("Expected input with f32 data"), + } + })?; + let output_stream = output_device.build_output_stream(&format, move |result| { + let data = match result { + Ok(data) => data, + Err(err) => { + eprintln!("an error occurred on output stream: {}", err); + return; + }, + }; + match data { + cpal::StreamData::Output { + buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer), + } => { + let mut input_fell_behind = None; + for sample in buffer.iter_mut() { + *sample = match consumer.pop() { + Ok(s) => s, + Err(err) => { + input_fell_behind = Some(err); + 0.0 + }, + }; + } + if let Some(err) = input_fell_behind { + eprintln!("input stream fell behind: {:?}: try increasing latency", err); + } + }, + _ => panic!("Expected output with f32 data"), + } + })?; + println!("Successfully built streams."); - match data { - cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => { - assert_eq!(id, input_stream_id); - let mut output_fell_behind = false; - for &sample in buffer.iter() { - if producer.push(sample).is_err() { - output_fell_behind = true; - } - } - if output_fell_behind { - eprintln!("output stream fell behind: try increasing latency"); - } - }, - cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer) } => { - assert_eq!(id, output_stream_id); - let mut input_fell_behind = None; - for sample in buffer.iter_mut() { - *sample = match consumer.pop() { - Ok(s) => s, - Err(err) => { - input_fell_behind = Some(err); - 0.0 - }, - }; - } - if let Some(_) = input_fell_behind { - eprintln!("input stream fell behind: try increasing latency"); - } - }, - _ => panic!("we're expecting f32 data"), - } - }); - }); + // Play the streams. + println!( + "Starting the input and output streams with `{}` milliseconds of latency.", + LATENCY_MS + ); + input_stream.play()?; + output_stream.play()?; // Run for 3 seconds before closing. println!("Playing for 3 seconds... "); std::thread::sleep(std::time::Duration::from_secs(3)); + drop(input_stream); + drop(output_stream); println!("Done!"); Ok(()) } diff --git a/examples/record_wav.rs b/examples/record_wav.rs index 111ede8..d53f9b3 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -6,21 +6,21 @@ extern crate anyhow; extern crate cpal; extern crate hound; -use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; fn main() -> Result<(), anyhow::Error> { // Use the default host for working with audio devices. let host = cpal::default_host(); // Setup the default input device and stream with the default input format. - let device = host.default_input_device().expect("Failed to get default input device"); + let device = host + .default_input_device() + .expect("Failed to get default input device"); println!("Default input device: {}", device.name()?); - let format = device.default_input_format().expect("Failed to get default input format"); + let format = device + .default_input_format() + .expect("Failed to get default input format"); println!("Default input format: {:?}", format); - let event_loop = host.event_loop(); - let stream_id = event_loop.build_input_stream(&device, &format)?; - event_loop.play_stream(stream_id)?; - // The WAV file we're recording to. const PATH: &'static str = concat!(env!("CARGO_MANIFEST_DIR"), "/recorded.wav"); let spec = wav_spec_from_format(&format); @@ -29,63 +29,62 @@ fn main() -> Result<(), anyhow::Error> { // A flag to indicate that recording is in progress. println!("Begin recording..."); - let recording = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true)); // Run the input stream on a separate thread. let writer_2 = writer.clone(); - let recording_2 = recording.clone(); - std::thread::spawn(move || { - event_loop.run(move |id, event| { - let data = match event { - Ok(data) => data, - Err(err) => { - eprintln!("an error occurred on stream {:?}: {}", id, err); - return; - } - }; - - // If we're done recording, return early. - if !recording_2.load(std::sync::atomic::Ordering::Relaxed) { + let stream = device.build_input_stream(&format, move |event| { + let data = match event { + Ok(data) => data, + Err(err) => { + eprintln!("an error occurred on stream: {}", err); return; - } - // Otherwise write to the wav writer. - match data { - cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::U16(buffer) } => { - if let Ok(mut guard) = writer_2.try_lock() { - if let Some(writer) = guard.as_mut() { - for sample in buffer.iter() { - let sample = cpal::Sample::to_i16(sample); - writer.write_sample(sample).ok(); - } + }, + }; + + // Otherwise write to the wav writer. + match data { + cpal::StreamData::Input { + buffer: cpal::UnknownTypeInputBuffer::U16(buffer), + } => { + if let Ok(mut guard) = writer_2.try_lock() { + if let Some(writer) = guard.as_mut() { + for sample in buffer.iter() { + let sample = cpal::Sample::to_i16(sample); + writer.write_sample(sample).ok(); } } - }, - cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::I16(buffer) } => { - if let Ok(mut guard) = writer_2.try_lock() { - if let Some(writer) = guard.as_mut() { - for &sample in buffer.iter() { - writer.write_sample(sample).ok(); - } + } + }, + cpal::StreamData::Input { + buffer: cpal::UnknownTypeInputBuffer::I16(buffer), + } => { + if let Ok(mut guard) = writer_2.try_lock() { + if let Some(writer) = guard.as_mut() { + for &sample in buffer.iter() { + writer.write_sample(sample).ok(); } } - }, - cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => { - if let Ok(mut guard) = writer_2.try_lock() { - if let Some(writer) = guard.as_mut() { - for &sample in buffer.iter() { - writer.write_sample(sample).ok(); - } + } + }, + cpal::StreamData::Input { + buffer: cpal::UnknownTypeInputBuffer::F32(buffer), + } => { + if let Ok(mut guard) = writer_2.try_lock() { + if let Some(writer) = guard.as_mut() { + for &sample in buffer.iter() { + writer.write_sample(sample).ok(); } } - }, - _ => (), - } - }); - }); + } + }, + _ => (), + } + })?; + stream.play()?; // Let recording go for roughly three seconds. std::thread::sleep(std::time::Duration::from_secs(3)); - recording.store(false, std::sync::atomic::Ordering::Relaxed); + drop(stream); writer.lock().unwrap().take().unwrap().finalize()?; println!("Recording {} complete!", PATH); Ok(()) diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index afc32c6..72c3afe 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -1,11 +1,14 @@ extern crate alsa_sys as alsa; extern crate libc; -pub use self::enumerate::{Devices, default_input_device, default_output_device}; +use std::{cmp, ffi, io, mem, ptr}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::vec::IntoIter as VecIntoIter; -use ChannelCount; use BackendSpecificError; use BuildStreamError; +use ChannelCount; use DefaultFormatError; use DeviceNameError; use DevicesError; @@ -14,20 +17,15 @@ use PauseStreamError; use PlayStreamError; use SampleFormat; use SampleRate; -use SupportedFormatsError; use StreamData; use StreamDataResult; -use StreamError; use SupportedFormat; +use SupportedFormatsError; +use traits::{DeviceTrait, HostTrait, StreamTrait}; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; -use traits::{DeviceTrait, EventLoopTrait, HostTrait, StreamIdTrait}; -use std::{cmp, ffi, ptr}; -use std::sync::Mutex; -use std::sync::mpsc::{channel, Sender, Receiver}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::vec::IntoIter as VecIntoIter; +pub use self::enumerate::{default_input_device, default_output_device, Devices}; pub type SupportedInputFormats = VecIntoIter; pub type SupportedOutputFormats = VecIntoIter; @@ -47,7 +45,6 @@ impl Host { impl HostTrait for Host { type Devices = Devices; type Device = Device; - type EventLoop = EventLoop; fn is_available() -> bool { // Assume ALSA is always available on linux/freebsd. @@ -65,15 +62,12 @@ impl HostTrait for Host { fn default_output_device(&self) -> Option { default_output_device() } - - fn event_loop(&self) -> Self::EventLoop { - EventLoop::new() - } } impl DeviceTrait for Device { type SupportedInputFormats = SupportedInputFormats; type SupportedOutputFormats = SupportedOutputFormats; + type Stream = Stream; fn name(&self) -> Result { Device::name(self) @@ -94,95 +88,132 @@ impl DeviceTrait for Device { fn default_output_format(&self) -> Result { Device::default_output_format(self) } -} -impl EventLoopTrait for EventLoop { - type Device = Device; - type StreamId = StreamId; - - fn build_input_stream( - &self, - device: &Self::Device, - format: &Format, - ) -> Result { - EventLoop::build_input_stream(self, device, format) + fn build_input_stream(&self, format: &Format, callback: F) -> Result where F: FnMut(StreamDataResult) + Send + 'static { + Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_CAPTURE)?), callback)) } - fn build_output_stream( - &self, - device: &Self::Device, - format: &Format, - ) -> Result { - EventLoop::build_output_stream(self, device, format) - } - - fn play_stream(&self, stream: Self::StreamId) -> Result<(), PlayStreamError> { - EventLoop::play_stream(self, stream) - } - - fn pause_stream(&self, stream: Self::StreamId) -> Result<(), PauseStreamError> { - EventLoop::pause_stream(self, stream) - } - - fn destroy_stream(&self, stream: Self::StreamId) { - EventLoop::destroy_stream(self, stream) - } - - fn run(&self, callback: F) -> ! - where - F: FnMut(Self::StreamId, StreamDataResult) + Send, - { - EventLoop::run(self, callback) + fn build_output_stream(&self, format: &Format, callback: F) -> Result where F: FnMut(StreamDataResult) + Send + 'static { + Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_PLAYBACK)?), callback)) } } -impl StreamIdTrait for StreamId {} -struct Trigger { - // [read fd, write fd] - fds: [libc::c_int; 2], -} +struct TriggerSender(libc::c_int); -impl Trigger { - fn new() -> Self { - let mut fds = [0, 0]; - match unsafe { libc::pipe(fds.as_mut_ptr()) } { - 0 => Trigger { fds: fds }, - _ => panic!("Could not create pipe"), - } - } - fn read_fd(&self) -> libc::c_int { - self.fds[0] - } - fn write_fd(&self) -> libc::c_int { - self.fds[1] - } +struct TriggerReceiver(libc::c_int); + +impl TriggerSender { fn wakeup(&self) { let buf = 1u64; - let ret = unsafe { libc::write(self.write_fd(), &buf as *const u64 as *const _, 8) }; + let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) }; assert!(ret == 8); } +} + +impl TriggerReceiver { fn clear_pipe(&self) { let mut out = 0u64; - let ret = unsafe { libc::read(self.read_fd(), &mut out as *mut u64 as *mut _, 8) }; + let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) }; assert_eq!(ret, 8); } } -impl Drop for Trigger { +fn trigger() -> (TriggerSender, TriggerReceiver) { + let mut fds = [0, 0]; + match unsafe { libc::pipe(fds.as_mut_ptr()) } { + 0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])), + _ => panic!("Could not create pipe"), + } +} + +impl Drop for TriggerSender { fn drop(&mut self) { unsafe { - libc::close(self.fds[0]); - libc::close(self.fds[1]); + libc::close(self.0); } } } +impl Drop for TriggerReceiver { + fn drop(&mut self) { + unsafe { + libc::close(self.0); + } + } +} #[derive(Clone, Debug, PartialEq, Eq)] pub struct Device(String); impl Device { + fn build_stream_inner(&self, format: &Format, stream_type: alsa::snd_pcm_stream_t) -> Result { + let name = ffi::CString::new(self.0.clone()).expect("unable to clone device"); + + let handle = unsafe { + let mut handle = ptr::null_mut(); + match alsa::snd_pcm_open( + &mut handle, + name.as_ptr(), + stream_type, + alsa::SND_PCM_NONBLOCK, + ) { + -16 /* determined empirically */ => return Err(BuildStreamError::DeviceNotAvailable), + -22 => return Err(BuildStreamError::InvalidArgument), + e => if let Err(description) = check_errors(e) { + let err = BackendSpecificError { description }; + return Err(err.into()); + } + } + handle + }; + let can_pause = unsafe { + let hw_params = HwParams::alloc(); + set_hw_params_from_format(handle, &hw_params, format) + .map_err(|description| BackendSpecificError { description })?; + + alsa::snd_pcm_hw_params_can_pause(hw_params.0) == 1 + }; + let (buffer_len, period_len) = unsafe { + set_sw_params_from_format(handle, format) + .map_err(|description| BackendSpecificError { description })? + }; + + if let Err(desc) = check_errors(unsafe { alsa::snd_pcm_prepare(handle) }) { + let description = format!("could not get handle: {}", desc); + let err = BackendSpecificError { description }; + return Err(err.into()); + } + + let num_descriptors = { + let num_descriptors = unsafe { alsa::snd_pcm_poll_descriptors_count(handle) }; + if num_descriptors == 0 { + let description = "poll descriptor count for stream was 0".to_string(); + let err = BackendSpecificError { description }; + return Err(err.into()); + } + num_descriptors as usize + }; + + let stream_inner = StreamInner { + channel: handle, + sample_format: format.data_type, + num_descriptors, + num_channels: format.channels as u16, + buffer_len, + period_len, + can_pause, + }; + + if let Err(desc) = check_errors(unsafe { alsa::snd_pcm_start(handle) }) { + let description = format!("could not start stream: {}", desc); + let err = BackendSpecificError { description }; + return Err(err.into()); + } + + Ok(stream_inner) + } + #[inline] fn name(&self) -> Result { Ok(self.0.clone()) @@ -450,48 +481,7 @@ impl Device { } } -pub struct EventLoop { - // Each newly-created stream gets a new ID from this counter. The counter is then incremented. - next_stream_id: AtomicUsize, // TODO: use AtomicU64 when stable? - - // A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so - // that `poll()` can wake up and pick the changes. - pending_command_trigger: Trigger, - - // This field is locked by the `run()` method. - // The mutex also ensures that only one thread at a time has `run()` running. - run_context: Mutex, - - // Commands processed by the `run()` method that is currently running. - commands: Sender, -} - -unsafe impl Send for EventLoop { -} - -unsafe impl Sync for EventLoop { -} - -enum Command { - NewStream(StreamInner), - PlayStream(StreamId), - PauseStream(StreamId), - DestroyStream(StreamId), -} - -struct RunContext { - // Descriptors to wait for. Always contains `pending_command_trigger.read_fd()` as first element. - descriptors: Vec, - // List of streams that are written in `descriptors`. - streams: Vec, - - commands: Receiver, -} - struct StreamInner { - // The id of the stream. - id: StreamId, - // The ALSA channel. channel: *mut alsa::snd_pcm_t, @@ -513,501 +503,230 @@ struct StreamInner { // Whether or not the hardware supports pausing the stream. can_pause: bool, - - // Whether or not the sample stream is currently paused. - is_paused: bool, - - // A file descriptor opened with `eventfd`. - // It is used to wait for resume signal. - resume_trigger: Trigger, - - // Lazily allocated buffer that is reused inside the loop. - // Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is - // used. - buffer: Vec, } -#[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] -pub struct StreamId(usize); +// Assume that the ALSA library is built with thread safe option. +unsafe impl Send for StreamInner {} + +unsafe impl Sync for StreamInner {} enum StreamType { Input, Output } +pub struct Stream { + /// The high-priority audio processing thread calling callbacks. + /// Option used for moving out in destructor. + thread: Option>, -impl EventLoop { - #[inline] - fn new() -> EventLoop { - let pending_command_trigger = Trigger::new(); + /// Handle to the underlying stream for playback controls. + inner: Arc, - let mut initial_descriptors = vec![]; - reset_descriptors_with_pending_command_trigger( - &mut initial_descriptors, - &pending_command_trigger, - ); - - let (tx, rx) = channel(); - - let run_context = Mutex::new(RunContext { - descriptors: initial_descriptors, - streams: Vec::new(), - commands: rx, - }); - - EventLoop { - next_stream_id: AtomicUsize::new(0), - pending_command_trigger: pending_command_trigger, - run_context, - commands: tx, - } - } - - #[inline] - fn run(&self, mut callback: F) -> ! - where F: FnMut(StreamId, StreamDataResult) - { - self.run_inner(&mut callback) - } - - fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! { - unsafe { - let mut run_context = self.run_context.lock().unwrap(); - let run_context = &mut *run_context; - - 'stream_loop: loop { - process_commands(run_context); - - reset_descriptors_with_pending_command_trigger( - &mut run_context.descriptors, - &self.pending_command_trigger, - ); - append_stream_poll_descriptors(run_context); - - // At this point, this should include the command `pending_commands_trigger` along - // with the poll descriptors for each stream. - match poll_all_descriptors(&mut run_context.descriptors) { - Ok(true) => (), - Ok(false) => continue, - Err(err) => { - for stream in run_context.streams.iter() { - let result = Err(err.clone().into()); - callback(stream.id, result); - } - run_context.streams.clear(); - break 'stream_loop; - } - } - - // If the `pending_command_trigger` was signaled, we need to process the comands. - if run_context.descriptors[0].revents != 0 { - run_context.descriptors[0].revents = 0; - self.pending_command_trigger.clear_pipe(); - } - - // The set of streams that error within the following loop and should be removed. - let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; - - // Iterate over each individual stream/descriptor. - let mut i_stream = 0; - let mut i_descriptor = 1; - while (i_descriptor as usize) < run_context.descriptors.len() { - let stream = &mut run_context.streams[i_stream]; - let stream_descriptor_ptr = run_context.descriptors.as_mut_ptr().offset(i_descriptor); - - // Only go on if this event was a pollout or pollin event. - let stream_type = match check_for_pollout_or_pollin(stream, stream_descriptor_ptr) { - Ok(Some(ty)) => ty, - Ok(None) => { - i_descriptor += stream.num_descriptors as isize; - i_stream += 1; - continue; - }, - Err(err) => { - streams_to_remove.push((stream.id, err.into())); - i_descriptor += stream.num_descriptors as isize; - i_stream += 1; - continue; - } - }; - - // Get the number of available samples for reading/writing. - let available_samples = match get_available_samples(stream) { - Ok(n) => n, - Err(err) => { - streams_to_remove.push((stream.id, err.into())); - i_descriptor += stream.num_descriptors as isize; - i_stream += 1; - continue; - } - }; - - // Only go on if there is at least `stream.period_len` samples. - if available_samples < stream.period_len { - i_descriptor += stream.num_descriptors as isize; - i_stream += 1; - continue; - } - - // Prepare the data buffer. - let buffer_size = stream.sample_format.sample_size() * available_samples; - stream.buffer.resize(buffer_size, 0u8); - let available_frames = available_samples / stream.num_channels as usize; - - match stream_type { - StreamType::Input => { - let result = alsa::snd_pcm_readi( - stream.channel, - stream.buffer.as_mut_ptr() as *mut _, - available_frames as alsa::snd_pcm_uframes_t, - ); - if let Err(err) = check_errors(result as _) { - let description = format!("`snd_pcm_readi` failed: {}", err); - let err = BackendSpecificError { description }; - streams_to_remove.push((stream.id, err.into())); - continue; - } - - let input_buffer = match stream.sample_format { - SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer { - buffer: cast_input_buffer(&mut stream.buffer), - }), - SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer { - buffer: cast_input_buffer(&mut stream.buffer), - }), - SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer { - buffer: cast_input_buffer(&mut stream.buffer), - }), - }; - let stream_data = StreamData::Input { - buffer: input_buffer, - }; - callback(stream.id, Ok(stream_data)); - }, - StreamType::Output => { - { - // We're now sure that we're ready to write data. - let output_buffer = match stream.sample_format { - SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer { - buffer: cast_output_buffer(&mut stream.buffer), - }), - SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer { - buffer: cast_output_buffer(&mut stream.buffer), - }), - SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer { - buffer: cast_output_buffer(&mut stream.buffer), - }), - }; - - let stream_data = StreamData::Output { - buffer: output_buffer, - }; - callback(stream.id, Ok(stream_data)); - } - loop { - let result = alsa::snd_pcm_writei( - stream.channel, - stream.buffer.as_ptr() as *const _, - available_frames as alsa::snd_pcm_uframes_t, - ); - - if result as i32 == -libc::EPIPE { - // buffer underrun - // TODO: Notify the user of this. - alsa::snd_pcm_recover(stream.channel, result as i32, 0); - } else if let Err(err) = check_errors(result as _) { - let description = format!("`snd_pcm_writei` failed: {}", err); - let err = BackendSpecificError { description }; - streams_to_remove.push((stream.id, err.into())); - continue; - } else if result as usize != available_frames { - let description = format!( - "unexpected number of frames written: expected {}, \ - result {} (this should never happen)", - available_frames, - result, - ); - let err = BackendSpecificError { description }; - streams_to_remove.push((stream.id, err.into())); - continue; - } else { - break; - } - } - }, - } - } - - // Remove any streams that have errored and notify the user. - for (stream_id, err) in streams_to_remove { - run_context.streams.retain(|s| s.id != stream_id); - callback(stream_id, Err(err.into())); - } - } - } - - panic!("`cpal::EventLoop::run` API currently disallows returning"); - } - - fn build_input_stream( - &self, - device: &Device, - format: &Format, - ) -> Result - { - unsafe { - let name = ffi::CString::new(device.0.clone()).expect("unable to clone device"); - - let mut capture_handle = ptr::null_mut(); - match alsa::snd_pcm_open( - &mut capture_handle, - name.as_ptr(), - alsa::SND_PCM_STREAM_CAPTURE, - alsa::SND_PCM_NONBLOCK, - ) { - -16 /* determined empirically */ => return Err(BuildStreamError::DeviceNotAvailable), - -22 => return Err(BuildStreamError::InvalidArgument), - e => if let Err(description) = check_errors(e) { - let err = BackendSpecificError { description }; - return Err(err.into()); - } - } - let hw_params = HwParams::alloc(); - - set_hw_params_from_format(capture_handle, &hw_params, format) - .map_err(|description| BackendSpecificError { description })?; - - let can_pause = alsa::snd_pcm_hw_params_can_pause(hw_params.0) == 1; - - let (buffer_len, period_len) = set_sw_params_from_format(capture_handle, format) - .map_err(|description| BackendSpecificError { description })?; - - if let Err(desc) = check_errors(alsa::snd_pcm_prepare(capture_handle)) { - let description = format!("could not get capture handle: {}", desc); - let err = BackendSpecificError { description }; - return Err(err.into()); - } - - let num_descriptors = { - let num_descriptors = alsa::snd_pcm_poll_descriptors_count(capture_handle); - if num_descriptors == 0 { - let description = "poll descriptor count for capture stream was 0".to_string(); - let err = BackendSpecificError { description }; - return Err(err.into()); - } - num_descriptors as usize - }; - - let new_stream_id = StreamId(self.next_stream_id.fetch_add(1, Ordering::Relaxed)); - if new_stream_id.0 == usize::max_value() { - panic!("number of streams used has overflowed usize"); - } - - let stream_inner = StreamInner { - id: new_stream_id.clone(), - channel: capture_handle, - sample_format: format.data_type, - num_descriptors: num_descriptors, - num_channels: format.channels as u16, - buffer_len: buffer_len, - period_len: period_len, - can_pause: can_pause, - is_paused: false, - resume_trigger: Trigger::new(), - buffer: vec![], - }; - - if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) { - let description = format!("could not start capture stream: {}", desc); - let err = BackendSpecificError { description }; - return Err(err.into()); - } - - self.push_command(Command::NewStream(stream_inner)); - Ok(new_stream_id) - } - } - - fn build_output_stream( - &self, - device: &Device, - format: &Format, - ) -> Result - { - unsafe { - let name = ffi::CString::new(device.0.clone()).expect("unable to clone device"); - - let mut playback_handle = ptr::null_mut(); - match alsa::snd_pcm_open( - &mut playback_handle, - name.as_ptr(), - alsa::SND_PCM_STREAM_PLAYBACK, - alsa::SND_PCM_NONBLOCK, - ) { - -16 /* determined empirically */ => return Err(BuildStreamError::DeviceNotAvailable), - -22 => return Err(BuildStreamError::InvalidArgument), - e => if let Err(description) = check_errors(e) { - let err = BackendSpecificError { description }; - return Err(err.into()) - } - } - let hw_params = HwParams::alloc(); - - set_hw_params_from_format(playback_handle, &hw_params, format) - .map_err(|description| BackendSpecificError { description })?; - - let can_pause = alsa::snd_pcm_hw_params_can_pause(hw_params.0) == 1; - - let (buffer_len, period_len) = set_sw_params_from_format(playback_handle, format) - .map_err(|description| BackendSpecificError { description })?; - - if let Err(desc) = check_errors(alsa::snd_pcm_prepare(playback_handle)) { - let description = format!("could not get playback handle: {}", desc); - let err = BackendSpecificError { description }; - return Err(err.into()); - } - - let num_descriptors = { - let num_descriptors = alsa::snd_pcm_poll_descriptors_count(playback_handle); - if num_descriptors == 0 { - let description = "poll descriptor count for playback stream was 0".to_string(); - let err = BackendSpecificError { description }; - return Err(err.into()); - } - num_descriptors as usize - }; - - let new_stream_id = StreamId(self.next_stream_id.fetch_add(1, Ordering::Relaxed)); - if new_stream_id.0 == usize::max_value() { - return Err(BuildStreamError::StreamIdOverflow); - } - - let stream_inner = StreamInner { - id: new_stream_id.clone(), - channel: playback_handle, - sample_format: format.data_type, - num_descriptors: num_descriptors, - num_channels: format.channels as u16, - buffer_len: buffer_len, - period_len: period_len, - can_pause: can_pause, - is_paused: false, - resume_trigger: Trigger::new(), - buffer: vec![], - }; - - self.push_command(Command::NewStream(stream_inner)); - Ok(new_stream_id) - } - } - - #[inline] - fn push_command(&self, command: Command) { - // Safe to unwrap: sender outlives receiver. - self.commands.send(command).unwrap(); - self.pending_command_trigger.wakeup(); - } - - #[inline] - fn destroy_stream(&self, stream_id: StreamId) { - self.push_command(Command::DestroyStream(stream_id)); - } - - #[inline] - fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> { - self.push_command(Command::PlayStream(stream_id)); - Ok(()) - } - - #[inline] - fn pause_stream(&self, stream_id: StreamId) -> Result<(), PauseStreamError> { - self.push_command(Command::PauseStream(stream_id)); - Ok(()) - } + /// Used to signal to stop processing. + trigger: TriggerSender, } -// Process any pending `Command`s within the `RunContext`'s queue. -fn process_commands(run_context: &mut RunContext) { - for command in run_context.commands.try_iter() { - match command { - Command::DestroyStream(stream_id) => { - run_context.streams.retain(|s| s.id != stream_id); - }, - Command::PlayStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - unsafe { - alsa::snd_pcm_pause(stream.channel, 0); - } - stream.is_paused = false; - } - }, - Command::PauseStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - unsafe { - alsa::snd_pcm_pause(stream.channel, 1); - } - stream.is_paused = true; - } - }, - Command::NewStream(stream_inner) => { - run_context.streams.push(stream_inner); - }, - } - } -} +/// The inner body of the audio processing thread. Takes the polymorphic +/// callback to avoid generating too much generic code. +fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn FnMut(StreamDataResult) + Send + 'static)) { + let mut descriptors = Vec::new(); + let mut buffer = Vec::new(); + loop { + descriptors.clear(); + // Add the self-pipe for signaling termination. + descriptors.push(libc::pollfd { + fd: rx.0, + events: libc::POLLIN, + revents: 0, + }); -// Resets the descriptors so that only `pending_command_trigger.read_fd()` is contained. -fn reset_descriptors_with_pending_command_trigger( - descriptors: &mut Vec, - pending_command_trigger: &Trigger, -) { - descriptors.clear(); - descriptors.push(libc::pollfd { - fd: pending_command_trigger.read_fd(), - events: libc::POLLIN, - revents: 0, - }); -} - -// Appends the `poll` descriptors for each stream onto the `RunContext`'s descriptor slice, ready -// for a call to `libc::poll`. -fn append_stream_poll_descriptors(run_context: &mut RunContext) { - for stream in run_context.streams.iter() { - run_context.descriptors.reserve(stream.num_descriptors); - let len = run_context.descriptors.len(); + // Add ALSA polling fds. + descriptors.reserve(stream.num_descriptors); + let len = descriptors.len(); let filled = unsafe { alsa::snd_pcm_poll_descriptors( stream.channel, - run_context.descriptors.as_mut_ptr().offset(len as isize), + descriptors[len..].as_mut_ptr(), stream.num_descriptors as libc::c_uint, ) }; debug_assert_eq!(filled, stream.num_descriptors as libc::c_int); unsafe { - run_context.descriptors.set_len(len + stream.num_descriptors); + descriptors.set_len(len + stream.num_descriptors); + } + + let res = unsafe { + // Don't timeout, wait forever. + libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1) + }; + if res < 0 { + let description = format!("`libc::poll()` failed: {}", io::Error::last_os_error()); + callback(Err(BackendSpecificError { description }.into())); + continue; + } else if res == 0 { + let description = String::from("`libc::poll()` spuriously returned"); + callback(Err(BackendSpecificError { description }.into())); + continue; + } + + if descriptors[0].revents != 0 { + // The stream has been requested to be destroyed. + rx.clear_pipe(); + return; + } + + let stream_type = match check_for_pollout_or_pollin(stream, descriptors[1..].as_mut_ptr()) { + Ok(Some(ty)) => ty, + Ok(None) => { + // Nothing to process, poll again + continue; + }, + Err(err) => { + // TODO: signal errors + continue; + } + }; + // Get the number of available samples for reading/writing. + let available_samples = match get_available_samples(stream) { + Ok(n) => n, + Err(err) => { + let description = format!("Failed to query the number of available samples: {}", err); + callback(Err(BackendSpecificError { description }.into())); + continue; + } + }; + + // Only go on if there is at least `stream.period_len` samples. + if available_samples < stream.period_len { + continue; + } + + // Prepare the data buffer. + let buffer_size = stream.sample_format.sample_size() * available_samples; + buffer.resize(buffer_size, 0u8); + let available_frames = available_samples / stream.num_channels as usize; + + match stream_type { + StreamType::Input => { + let result = unsafe { + alsa::snd_pcm_readi( + stream.channel, + buffer.as_mut_ptr() as *mut _, + available_frames as alsa::snd_pcm_uframes_t, + ) + }; + if let Err(err) = check_errors(result as _) { + let description = format!("`snd_pcm_readi` failed: {}", err); + callback(Err(BackendSpecificError { description }.into())); + continue; + } + + let input_buffer = match stream.sample_format { + SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer { + buffer: unsafe { cast_input_buffer(&mut buffer) }, + }), + SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer { + buffer: unsafe { cast_input_buffer(&mut buffer) }, + }), + SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer { + buffer: unsafe { cast_input_buffer(&mut buffer) }, + }), + }; + let stream_data = StreamData::Input { + buffer: input_buffer, + }; + callback(Ok(stream_data)); + }, + StreamType::Output => { + { + // We're now sure that we're ready to write data. + let output_buffer = match stream.sample_format { + SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer { + buffer: unsafe { cast_output_buffer(&mut buffer) }, + }), + SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer { + buffer: unsafe { cast_output_buffer(&mut buffer) }, + }), + SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer { + buffer: unsafe { cast_output_buffer(&mut buffer) }, + }), + }; + + let stream_data = StreamData::Output { + buffer: output_buffer, + }; + callback(Ok(stream_data)); + } + loop { + let result = unsafe { + alsa::snd_pcm_writei( + stream.channel, + buffer.as_ptr() as *const _, + available_frames as alsa::snd_pcm_uframes_t, + ) + }; + + if result == -libc::EPIPE as i64 { + // buffer underrun + // TODO: Notify the user of this. + unsafe { alsa::snd_pcm_recover(stream.channel, result as i32, 0) }; + } else if let Err(err) = check_errors(result as _) { + let description = format!("`snd_pcm_writei` failed: {}", err); + callback(Err(BackendSpecificError { description }.into())); + continue; + } else if result as usize != available_frames { + let description = format!( + "unexpected number of frames written: expected {}, \ + result {} (this should never happen)", + available_frames, + result, + ); + callback(Err(BackendSpecificError { description }.into())); + continue; + } else { + break; + } + } + }, } } } -// Poll all descriptors within the given set. -// -// Returns `Ok(true)` if some event has occurred or `Ok(false)` if no events have -// occurred. -// -// Returns an `Err` if `libc::poll` returns a negative value for some reason. -fn poll_all_descriptors(descriptors: &mut [libc::pollfd]) -> Result { - let res = unsafe { - // Don't timeout, wait forever. - libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1) - }; - if res < 0 { - let description = format!("`libc::poll()` failed: {}", res); - Err(BackendSpecificError { description }) - } else if res == 0 { - Ok(false) - } else { - Ok(true) +impl Stream { + fn new(inner: Arc, mut callback: F) -> Stream where F: FnMut(StreamDataResult) + Send + 'static { + let (tx, rx) = trigger(); + // Clone the handle for passing into worker thread. + let stream = inner.clone(); + let thread = thread::spawn(move || { + stream_worker(rx, &*stream, &mut callback); + }); + Stream { + thread: Some(thread), + inner, + trigger: tx, + } + } +} + +impl Drop for Stream { + fn drop(&mut self) { + self.trigger.wakeup(); + self.thread.take().unwrap().join().unwrap(); + } +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + unsafe { + alsa::snd_pcm_pause(self.inner.channel, 0); + } + // TODO: error handling + Ok(()) + } + fn pause(&self)-> Result<(), PauseStreamError> { + unsafe { + alsa::snd_pcm_pause(self.inner.channel, 1); + } + // TODO: error handling + Ok(()) } } diff --git a/src/host/null/mod.rs b/src/host/null/mod.rs index 2126207..032f797 100644 --- a/src/host/null/mod.rs +++ b/src/host/null/mod.rs @@ -10,7 +10,7 @@ use PlayStreamError; use StreamDataResult; use SupportedFormatsError; use SupportedFormat; -use traits::{DeviceTrait, EventLoopTrait, HostTrait, StreamIdTrait}; +use traits::{DeviceTrait, HostTrait, StreamTrait}; #[derive(Default)] pub struct Devices; @@ -23,7 +23,7 @@ pub struct EventLoop; pub struct Host; #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct StreamId; +pub struct Stream; pub struct SupportedInputFormats; pub struct SupportedOutputFormats; @@ -49,6 +49,7 @@ impl EventLoop { impl DeviceTrait for Device { type SupportedInputFormats = SupportedInputFormats; type SupportedOutputFormats = SupportedOutputFormats; + type Stream = Stream; #[inline] fn name(&self) -> Result { @@ -74,49 +75,22 @@ impl DeviceTrait for Device { fn default_output_format(&self) -> Result { unimplemented!() } -} -impl EventLoopTrait for EventLoop { - type Device = Device; - type StreamId = StreamId; - - #[inline] - fn run(&self, _callback: F) -> ! - where F: FnMut(StreamId, StreamDataResult) - { - loop { /* TODO: don't spin */ } - } - - #[inline] - fn build_input_stream(&self, _: &Device, _: &Format) -> Result { - Err(BuildStreamError::DeviceNotAvailable) - } - - #[inline] - fn build_output_stream(&self, _: &Device, _: &Format) -> Result { - Err(BuildStreamError::DeviceNotAvailable) - } - - #[inline] - fn destroy_stream(&self, _: StreamId) { + fn build_input_stream(&self, format: &Format, callback: F) -> Result + where F: FnMut(StreamDataResult) + Send + 'static { unimplemented!() } - #[inline] - fn play_stream(&self, _: StreamId) -> Result<(), PlayStreamError> { - panic!() - } - - #[inline] - fn pause_stream(&self, _: StreamId) -> Result<(), PauseStreamError> { - panic!() + /// Create an output stream. + fn build_output_stream(&self, format: &Format, callback: F) -> Result + where F: FnMut(StreamDataResult) + Send + 'static{ + unimplemented!() } } impl HostTrait for Host { type Device = Device; type Devices = Devices; - type EventLoop = EventLoop; fn is_available() -> bool { false @@ -133,13 +107,17 @@ impl HostTrait for Host { fn default_output_device(&self) -> Option { None } - - fn event_loop(&self) -> Self::EventLoop { - EventLoop::new() - } } -impl StreamIdTrait for StreamId {} +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + unimplemented!() + } + + fn pause(&self) -> Result<(), PauseStreamError> { + unimplemented!() + } +} impl Iterator for Devices { type Item = Device; diff --git a/src/lib.rs b/src/lib.rs index 86d4ea0..61815e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,11 +151,10 @@ extern crate thiserror; pub use error::*; pub use platform::{ - ALL_HOSTS, Device, Devices, EventLoop, Host, HostId, SupportedInputFormats, - SupportedOutputFormats, StreamId, available_hosts, default_host, host_from_id, + ALL_HOSTS, available_hosts, default_host, Device, Devices, Host, host_from_id, + HostId, Stream, SupportedInputFormats, SupportedOutputFormats, }; pub use samples_formats::{Sample, SampleFormat}; - use std::ops::{Deref, DerefMut}; mod error; diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 6e9ee28..c85a8e6 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -58,14 +58,9 @@ macro_rules! impl_platform_host { /// type. pub struct Devices(DevicesInner); - /// The **EventLoop** implementation associated with the platform's dynamically dispatched + /// The **Stream** implementation associated with the platform's dynamically dispatched /// **Host** type. - pub struct EventLoop(EventLoopInner); - - /// The **StreamId** implementation associated with the platform's dynamically dispatched - /// **Host** type. - #[derive(Clone, Debug, Eq, Hash, PartialEq)] - pub struct StreamId(StreamIdInner); + pub struct Stream(StreamInner); /// The **SupportedInputFormats** iterator associated with the platform's dynamically /// dispatched **Host** type. @@ -95,22 +90,15 @@ macro_rules! impl_platform_host { )* } - enum EventLoopInner { - $( - $HostVariant(crate::host::$host_mod::EventLoop), - )* - } - enum HostInner { $( $HostVariant(crate::host::$host_mod::Host), )* } - #[derive(Clone, Debug, Eq, Hash, PartialEq)] - enum StreamIdInner { + enum StreamInner { $( - $HostVariant(crate::host::$host_mod::StreamId), + $HostVariant(crate::host::$host_mod::Stream), )* } @@ -212,6 +200,7 @@ macro_rules! impl_platform_host { impl crate::traits::DeviceTrait for Device { type SupportedInputFormats = SupportedInputFormats; type SupportedOutputFormats = SupportedOutputFormats; + type Stream = Stream; fn name(&self) -> Result { match self.0 { @@ -260,96 +249,25 @@ macro_rules! impl_platform_host { )* } } - } - impl crate::traits::EventLoopTrait for EventLoop { - type StreamId = StreamId; - type Device = Device; - - #[allow(unreachable_patterns)] - fn build_input_stream( - &self, - device: &Self::Device, - format: &crate::Format, - ) -> Result { - match (&self.0, &device.0) { - $( - (&EventLoopInner::$HostVariant(ref e), &DeviceInner::$HostVariant(ref d)) => { - e.build_input_stream(d, format) - .map(StreamIdInner::$HostVariant) - .map(StreamId) - } - )* - _ => panic!("tried to build a stream with a device from another host"), - } - } - - #[allow(unreachable_patterns)] - fn build_output_stream( - &self, - device: &Self::Device, - format: &crate::Format, - ) -> Result { - match (&self.0, &device.0) { - $( - (&EventLoopInner::$HostVariant(ref e), &DeviceInner::$HostVariant(ref d)) => { - e.build_output_stream(d, format) - .map(StreamIdInner::$HostVariant) - .map(StreamId) - } - )* - _ => panic!("tried to build a stream with a device from another host"), - } - } - - #[allow(unreachable_patterns)] - fn play_stream(&self, stream: Self::StreamId) -> Result<(), crate::PlayStreamError> { - match (&self.0, stream.0) { - $( - (&EventLoopInner::$HostVariant(ref e), StreamIdInner::$HostVariant(ref s)) => { - e.play_stream(s.clone()) - } - )* - _ => panic!("tried to play a stream with an ID associated with another host"), - } - } - - #[allow(unreachable_patterns)] - fn pause_stream(&self, stream: Self::StreamId) -> Result<(), crate::PauseStreamError> { - match (&self.0, stream.0) { - $( - (&EventLoopInner::$HostVariant(ref e), StreamIdInner::$HostVariant(ref s)) => { - e.pause_stream(s.clone()) - } - )* - _ => panic!("tried to pause a stream with an ID associated with another host"), - } - } - - #[allow(unreachable_patterns)] - fn destroy_stream(&self, stream: Self::StreamId) { - match (&self.0, stream.0) { - $( - (&EventLoopInner::$HostVariant(ref e), StreamIdInner::$HostVariant(ref s)) => { - e.destroy_stream(s.clone()) - } - )* - _ => panic!("tried to destroy a stream with an ID associated with another host"), - } - } - - fn run(&self, mut callback: F) -> ! - where - F: FnMut(Self::StreamId, crate::StreamDataResult) + Send - { + fn build_input_stream(&self, format: &crate::Format, callback: F) -> Result + where F: FnMut(crate::StreamDataResult) + Send + 'static { match self.0 { $( - EventLoopInner::$HostVariant(ref e) => { - e.run(|id, result| { - let result = result; - callback(StreamId(StreamIdInner::$HostVariant(id)), result); - }); - }, + DeviceInner::$HostVariant(ref d) => d.build_input_stream(format, callback) + .map(StreamInner::$HostVariant) + .map(Stream), + )* + } + } + + fn build_output_stream(&self, format: &crate::Format, callback: F) -> Result + where F: FnMut(crate::StreamDataResult) + Send + 'static { + match self.0 { + $( + DeviceInner::$HostVariant(ref d) => d.build_output_stream(format, callback) + .map(StreamInner::$HostVariant) + .map(Stream), )* } } @@ -358,7 +276,6 @@ macro_rules! impl_platform_host { impl crate::traits::HostTrait for Host { type Devices = Devices; type Device = Device; - type EventLoop = EventLoop; fn is_available() -> bool { $( crate::host::$host_mod::Host::is_available() ||)* false @@ -393,20 +310,30 @@ macro_rules! impl_platform_host { )* } } + } - fn event_loop(&self) -> Self::EventLoop { + impl crate::traits::StreamTrait for Stream { + fn play(&self) -> Result<(), crate::PlayStreamError> { match self.0 { $( - HostInner::$HostVariant(ref h) => { - EventLoop(EventLoopInner::$HostVariant(h.event_loop())) + StreamInner::$HostVariant(ref s) => { + s.play() + } + )* + } + } + + fn pause(&self) -> Result<(), crate::PauseStreamError> { + match self.0 { + $( + StreamInner::$HostVariant(ref s) => { + s.pause() } )* } } } - impl crate::traits::StreamIdTrait for StreamId {} - $( impl From for Device { fn from(h: crate::host::$host_mod::Device) -> Self { @@ -420,21 +347,15 @@ macro_rules! impl_platform_host { } } - impl From for EventLoop { - fn from(h: crate::host::$host_mod::EventLoop) -> Self { - EventLoop(EventLoopInner::$HostVariant(h)) - } - } - impl From for Host { fn from(h: crate::host::$host_mod::Host) -> Self { Host(HostInner::$HostVariant(h)) } } - impl From for StreamId { - fn from(h: crate::host::$host_mod::StreamId) -> Self { - StreamId(StreamIdInner::$HostVariant(h)) + impl From for Stream { + fn from(h: crate::host::$host_mod::Stream) -> Self { + Stream(StreamInner::$HostVariant(h)) } } )* @@ -471,9 +392,8 @@ mod platform_impl { pub use crate::host::alsa::{ Device as AlsaDevice, Devices as AlsaDevices, - EventLoop as AlsaEventLoop, Host as AlsaHost, - StreamId as AlsaStreamId, + Stream as AlsaStream, SupportedInputFormats as AlsaSupportedInputFormats, SupportedOutputFormats as AlsaSupportedOutputFormats, }; diff --git a/src/traits.rs b/src/traits.rs index f50764e..f50bedc 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -39,8 +39,6 @@ pub trait HostTrait { type Devices: Iterator; /// The `Device` type yielded by the host. type Device: DeviceTrait; - /// The event loop type used by the `Host` - type EventLoop: EventLoopTrait; /// Whether or not the host is available on the system. fn is_available() -> bool; @@ -60,9 +58,6 @@ pub trait HostTrait { /// Returns `None` if no output device is available. fn default_output_device(&self) -> Option; - /// Initialise the event loop, ready for managing audio streams. - fn event_loop(&self) -> Self::EventLoop; - /// An iterator yielding all `Device`s currently available to the system that support one or more /// input stream formats. /// @@ -99,6 +94,8 @@ pub trait DeviceTrait { type SupportedInputFormats: Iterator; /// The iterator type yielding supported output stream formats. type SupportedOutputFormats: Iterator; + /// The stream type created by `build_input_stream` and `build_output_stream`. + type Stream: StreamTrait; /// The human-readable name of the device. fn name(&self) -> Result; @@ -118,81 +115,19 @@ pub trait DeviceTrait { /// The default output stream format for the device. fn default_output_format(&self) -> Result; + + /// Create an input stream. + fn build_input_stream(&self, format: &Format, callback: F) -> Result + where F: FnMut(StreamDataResult) + Send + 'static; + + /// Create an output stream. + fn build_output_stream(&self, format: &Format, callback: F) -> Result + where F: FnMut(StreamDataResult) + Send + 'static; } -/// Collection of streams managed together. -/// -/// Created with the `Host::event_loop` method. -pub trait EventLoopTrait { - /// The `Device` type yielded by the host. - type Device: DeviceTrait; - /// The type used to uniquely distinguish between streams. - type StreamId: StreamIdTrait; +/// A stream created from `Device`, with methods to control playback. +pub trait StreamTrait { + fn play(&self) -> Result<(), PlayStreamError>; - /// Creates a new input stream that will run from the given device and with the given format. - /// - /// On success, returns an identifier for the stream. - /// - /// Can return an error if the device is no longer valid, or if the input stream format is not - /// supported by the device. - fn build_input_stream( - &self, - device: &Self::Device, - format: &Format, - ) -> Result; - - /// Creates a new output stream that will play on the given device and with the given format. - /// - /// On success, returns an identifier for the stream. - /// - /// Can return an error if the device is no longer valid, or if the output stream format is not - /// supported by the device. - fn build_output_stream( - &self, - device: &Self::Device, - format: &Format, - ) -> Result; - - /// Instructs the audio device that it should start playing the stream with the given ID. - /// - /// Has no effect is the stream was already playing. - /// - /// Only call this after you have submitted some data, otherwise you may hear some glitches. - /// - /// # Panic - /// - /// If the stream does not exist, this function can either panic or be a no-op. - fn play_stream(&self, stream: Self::StreamId) -> Result<(), PlayStreamError>; - - /// Instructs the audio device that it should stop playing the stream with the given ID. - /// - /// Has no effect is the stream was already paused. - /// - /// If you call `play` afterwards, the playback will resume where it was. - /// - /// # Panic - /// - /// If the stream does not exist, this function can either panic or be a no-op. - fn pause_stream(&self, stream: Self::StreamId) -> Result<(), PauseStreamError>; - - /// Destroys an existing stream. - /// - /// # Panic - /// - /// If the stream does not exist, this function can either panic or be a no-op. - fn destroy_stream(&self, stream: Self::StreamId); - - /// Takes control of the current thread and begins the stream processing. - /// - /// > **Note**: Since it takes control of the thread, this method is best called on a separate - /// > thread. - /// - /// Whenever a stream needs to be fed some data, the closure passed as parameter is called. - /// You can call the other methods of `EventLoop` without getting a deadlock. - fn run(&self, callback: F) -> ! - where - F: FnMut(Self::StreamId, StreamDataResult) + Send; -} - -/// The set of required bounds for host `StreamId` types. -pub trait StreamIdTrait: Clone + std::fmt::Debug + std::hash::Hash + PartialEq + Eq {} + fn pause(&self) -> Result<(), PauseStreamError>; +} \ No newline at end of file