Remove EventLoop and port the ALSA backend

This commit is contained in:
Tatsuyuki Ishi 2019-07-09 15:47:33 +09:00 committed by mitchmindtree
parent 700bef79d9
commit c97d1dd3fa
8 changed files with 519 additions and 951 deletions

View File

@ -1,37 +1,34 @@
extern crate anyhow; extern crate anyhow;
extern crate cpal; extern crate cpal;
use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait}; use cpal::traits::{DeviceTrait, StreamTrait, HostTrait};
fn main() -> Result<(), anyhow::Error> { fn main() -> Result<(), anyhow::Error> {
let host = cpal::default_host(); let host = cpal::default_host();
let device = host.default_output_device().expect("failed to find a default output device"); let device = host.default_output_device().expect("failed to find a default output device");
let format = device.default_output_format()?; let format = device.default_output_format()?;
let event_loop = host.event_loop();
let stream_id = event_loop.build_output_stream(&device, &format)?;
event_loop.play_stream(stream_id.clone())?;
let sample_rate = format.sample_rate.0 as f32; let sample_rate = format.sample_rate.0 as f32;
let channels = format.channels;
let mut sample_clock = 0f32; let mut sample_clock = 0f32;
// Produce a sinusoid of maximum amplitude. // Produce a sinusoid of maximum amplitude.
let mut next_value = || { let mut next_value = move || {
sample_clock = (sample_clock + 1.0) % sample_rate; sample_clock = (sample_clock + 1.0) % sample_rate;
(sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin() (sample_clock * 440.0 * 2.0 * 3.141592 / sample_rate).sin()
}; };
event_loop.run(move |id, result| { let stream = device.build_output_stream(&format, move |result| {
let data = match result { let data = match result {
Ok(data) => data, Ok(data) => data,
Err(err) => { Err(err) => {
eprintln!("an error occurred on stream {:?}: {}", id, err); eprintln!("an error occurred on stream: {}", err);
return; return;
} }
}; };
match data { match data {
cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => { cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::U16(mut buffer) } => {
for sample in buffer.chunks_mut(format.channels as usize) { for sample in buffer.chunks_mut(channels as usize) {
let value = ((next_value() * 0.5 + 0.5) * std::u16::MAX as f32) as u16; let value = ((next_value() * 0.5 + 0.5) * std::u16::MAX as f32) as u16;
for out in sample.iter_mut() { for out in sample.iter_mut() {
*out = value; *out = value;
@ -39,7 +36,7 @@ fn main() -> Result<(), anyhow::Error> {
} }
}, },
cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::I16(mut buffer) } => { cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::I16(mut buffer) } => {
for sample in buffer.chunks_mut(format.channels as usize) { for sample in buffer.chunks_mut(channels as usize) {
let value = (next_value() * std::i16::MAX as f32) as i16; let value = (next_value() * std::i16::MAX as f32) as i16;
for out in sample.iter_mut() { for out in sample.iter_mut() {
*out = value; *out = value;
@ -47,7 +44,7 @@ fn main() -> Result<(), anyhow::Error> {
} }
}, },
cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer) } => { cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer) } => {
for sample in buffer.chunks_mut(format.channels as usize) { for sample in buffer.chunks_mut(channels as usize) {
let value = next_value(); let value = next_value();
for out in sample.iter_mut() { for out in sample.iter_mut() {
*out = value; *out = value;
@ -56,5 +53,10 @@ fn main() -> Result<(), anyhow::Error> {
}, },
_ => (), _ => (),
} }
}); })?;
stream.play()?;
std::thread::sleep(std::time::Duration::from_millis(1000));
Ok(())
} }

View File

@ -10,18 +10,21 @@ extern crate anyhow;
extern crate cpal; extern crate cpal;
extern crate ringbuf; extern crate ringbuf;
use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use ringbuf::RingBuffer; use ringbuf::RingBuffer;
const LATENCY_MS: f32 = 150.0; const LATENCY_MS: f32 = 150.0;
fn main() -> Result<(), anyhow::Error> { fn main() -> Result<(), anyhow::Error> {
let host = cpal::default_host(); let host = cpal::default_host();
let event_loop = host.event_loop();
// Default devices. // Default devices.
let input_device = host.default_input_device().expect("failed to get default input device"); let input_device = host
let output_device = host.default_output_device().expect("failed to get default output device"); .default_input_device()
.expect("failed to get default input device");
let output_device = host
.default_output_device()
.expect("failed to get default output device");
println!("Using default input device: \"{}\"", input_device.name()?); println!("Using default input device: \"{}\"", input_device.name()?);
println!("Using default output device: \"{}\"", output_device.name()?); println!("Using default output device: \"{}\"", output_device.name()?);
@ -29,12 +32,6 @@ fn main() -> Result<(), anyhow::Error> {
let mut format = input_device.default_input_format()?; let mut format = input_device.default_input_format()?;
format.data_type = cpal::SampleFormat::F32; format.data_type = cpal::SampleFormat::F32;
// Build streams.
println!("Attempting to build both streams with `{:?}`.", format);
let input_stream_id = event_loop.build_input_stream(&input_device, &format)?;
let output_stream_id = event_loop.build_output_stream(&output_device, &format)?;
println!("Successfully built streams.");
// Create a delay in case the input and output devices aren't synced. // Create a delay in case the input and output devices aren't synced.
let latency_frames = (LATENCY_MS / 1_000.0) * format.sample_rate.0 as f32; let latency_frames = (LATENCY_MS / 1_000.0) * format.sample_rate.0 as f32;
let latency_samples = latency_frames as usize * format.channels as usize; let latency_samples = latency_frames as usize * format.channels as usize;
@ -50,25 +47,21 @@ fn main() -> Result<(), anyhow::Error> {
producer.push(0.0).unwrap(); producer.push(0.0).unwrap();
} }
// Play the streams. // Build streams.
println!("Starting the input and output streams with `{}` milliseconds of latency.", LATENCY_MS); println!("Attempting to build both streams with `{:?}`.", format);
event_loop.play_stream(input_stream_id.clone())?; let input_stream = input_device.build_input_stream(&format, move |result| {
event_loop.play_stream(output_stream_id.clone())?;
// Run the event loop on a separate thread.
std::thread::spawn(move || {
event_loop.run(move |id, result| {
let data = match result { let data = match result {
Ok(data) => data, Ok(data) => data,
Err(err) => { Err(err) => {
eprintln!("an error occurred on stream {:?}: {}", id, err); eprintln!("an error occurred on input stream: {}", err);
return; return;
} },
}; };
match data { match data {
cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => { cpal::StreamData::Input {
assert_eq!(id, input_stream_id); buffer: cpal::UnknownTypeInputBuffer::F32(buffer),
} => {
let mut output_fell_behind = false; let mut output_fell_behind = false;
for &sample in buffer.iter() { for &sample in buffer.iter() {
if producer.push(sample).is_err() { if producer.push(sample).is_err() {
@ -79,8 +72,21 @@ fn main() -> Result<(), anyhow::Error> {
eprintln!("output stream fell behind: try increasing latency"); eprintln!("output stream fell behind: try increasing latency");
} }
}, },
cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer) } => { _ => panic!("Expected input with f32 data"),
assert_eq!(id, output_stream_id); }
})?;
let output_stream = output_device.build_output_stream(&format, move |result| {
let data = match result {
Ok(data) => data,
Err(err) => {
eprintln!("an error occurred on output stream: {}", err);
return;
},
};
match data {
cpal::StreamData::Output {
buffer: cpal::UnknownTypeOutputBuffer::F32(mut buffer),
} => {
let mut input_fell_behind = None; let mut input_fell_behind = None;
for sample in buffer.iter_mut() { for sample in buffer.iter_mut() {
*sample = match consumer.pop() { *sample = match consumer.pop() {
@ -91,18 +97,28 @@ fn main() -> Result<(), anyhow::Error> {
}, },
}; };
} }
if let Some(_) = input_fell_behind { if let Some(err) = input_fell_behind {
eprintln!("input stream fell behind: try increasing latency"); eprintln!("input stream fell behind: {:?}: try increasing latency", err);
} }
}, },
_ => panic!("we're expecting f32 data"), _ => panic!("Expected output with f32 data"),
} }
}); })?;
}); println!("Successfully built streams.");
// Play the streams.
println!(
"Starting the input and output streams with `{}` milliseconds of latency.",
LATENCY_MS
);
input_stream.play()?;
output_stream.play()?;
// Run for 3 seconds before closing. // Run for 3 seconds before closing.
println!("Playing for 3 seconds... "); println!("Playing for 3 seconds... ");
std::thread::sleep(std::time::Duration::from_secs(3)); std::thread::sleep(std::time::Duration::from_secs(3));
drop(input_stream);
drop(output_stream);
println!("Done!"); println!("Done!");
Ok(()) Ok(())
} }

View File

@ -6,21 +6,21 @@ extern crate anyhow;
extern crate cpal; extern crate cpal;
extern crate hound; extern crate hound;
use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
fn main() -> Result<(), anyhow::Error> { fn main() -> Result<(), anyhow::Error> {
// Use the default host for working with audio devices. // Use the default host for working with audio devices.
let host = cpal::default_host(); let host = cpal::default_host();
// Setup the default input device and stream with the default input format. // Setup the default input device and stream with the default input format.
let device = host.default_input_device().expect("Failed to get default input device"); let device = host
.default_input_device()
.expect("Failed to get default input device");
println!("Default input device: {}", device.name()?); println!("Default input device: {}", device.name()?);
let format = device.default_input_format().expect("Failed to get default input format"); let format = device
.default_input_format()
.expect("Failed to get default input format");
println!("Default input format: {:?}", format); println!("Default input format: {:?}", format);
let event_loop = host.event_loop();
let stream_id = event_loop.build_input_stream(&device, &format)?;
event_loop.play_stream(stream_id)?;
// The WAV file we're recording to. // The WAV file we're recording to.
const PATH: &'static str = concat!(env!("CARGO_MANIFEST_DIR"), "/recorded.wav"); const PATH: &'static str = concat!(env!("CARGO_MANIFEST_DIR"), "/recorded.wav");
let spec = wav_spec_from_format(&format); let spec = wav_spec_from_format(&format);
@ -29,28 +29,23 @@ fn main() -> Result<(), anyhow::Error> {
// A flag to indicate that recording is in progress. // A flag to indicate that recording is in progress.
println!("Begin recording..."); println!("Begin recording...");
let recording = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
// Run the input stream on a separate thread. // Run the input stream on a separate thread.
let writer_2 = writer.clone(); let writer_2 = writer.clone();
let recording_2 = recording.clone(); let stream = device.build_input_stream(&format, move |event| {
std::thread::spawn(move || {
event_loop.run(move |id, event| {
let data = match event { let data = match event {
Ok(data) => data, Ok(data) => data,
Err(err) => { Err(err) => {
eprintln!("an error occurred on stream {:?}: {}", id, err); eprintln!("an error occurred on stream: {}", err);
return; return;
} },
}; };
// If we're done recording, return early.
if !recording_2.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
// Otherwise write to the wav writer. // Otherwise write to the wav writer.
match data { match data {
cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::U16(buffer) } => { cpal::StreamData::Input {
buffer: cpal::UnknownTypeInputBuffer::U16(buffer),
} => {
if let Ok(mut guard) = writer_2.try_lock() { if let Ok(mut guard) = writer_2.try_lock() {
if let Some(writer) = guard.as_mut() { if let Some(writer) = guard.as_mut() {
for sample in buffer.iter() { for sample in buffer.iter() {
@ -60,7 +55,9 @@ fn main() -> Result<(), anyhow::Error> {
} }
} }
}, },
cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::I16(buffer) } => { cpal::StreamData::Input {
buffer: cpal::UnknownTypeInputBuffer::I16(buffer),
} => {
if let Ok(mut guard) = writer_2.try_lock() { if let Ok(mut guard) = writer_2.try_lock() {
if let Some(writer) = guard.as_mut() { if let Some(writer) = guard.as_mut() {
for &sample in buffer.iter() { for &sample in buffer.iter() {
@ -69,7 +66,9 @@ fn main() -> Result<(), anyhow::Error> {
} }
} }
}, },
cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => { cpal::StreamData::Input {
buffer: cpal::UnknownTypeInputBuffer::F32(buffer),
} => {
if let Ok(mut guard) = writer_2.try_lock() { if let Ok(mut guard) = writer_2.try_lock() {
if let Some(writer) = guard.as_mut() { if let Some(writer) = guard.as_mut() {
for &sample in buffer.iter() { for &sample in buffer.iter() {
@ -80,12 +79,12 @@ fn main() -> Result<(), anyhow::Error> {
}, },
_ => (), _ => (),
} }
}); })?;
}); stream.play()?;
// Let recording go for roughly three seconds. // Let recording go for roughly three seconds.
std::thread::sleep(std::time::Duration::from_secs(3)); std::thread::sleep(std::time::Duration::from_secs(3));
recording.store(false, std::sync::atomic::Ordering::Relaxed); drop(stream);
writer.lock().unwrap().take().unwrap().finalize()?; writer.lock().unwrap().take().unwrap().finalize()?;
println!("Recording {} complete!", PATH); println!("Recording {} complete!", PATH);
Ok(()) Ok(())

View File

@ -1,11 +1,14 @@
extern crate alsa_sys as alsa; extern crate alsa_sys as alsa;
extern crate libc; extern crate libc;
pub use self::enumerate::{Devices, default_input_device, default_output_device}; use std::{cmp, ffi, io, mem, ptr};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::vec::IntoIter as VecIntoIter;
use ChannelCount;
use BackendSpecificError; use BackendSpecificError;
use BuildStreamError; use BuildStreamError;
use ChannelCount;
use DefaultFormatError; use DefaultFormatError;
use DeviceNameError; use DeviceNameError;
use DevicesError; use DevicesError;
@ -14,20 +17,15 @@ use PauseStreamError;
use PlayStreamError; use PlayStreamError;
use SampleFormat; use SampleFormat;
use SampleRate; use SampleRate;
use SupportedFormatsError;
use StreamData; use StreamData;
use StreamDataResult; use StreamDataResult;
use StreamError;
use SupportedFormat; use SupportedFormat;
use SupportedFormatsError;
use traits::{DeviceTrait, HostTrait, StreamTrait};
use UnknownTypeInputBuffer; use UnknownTypeInputBuffer;
use UnknownTypeOutputBuffer; use UnknownTypeOutputBuffer;
use traits::{DeviceTrait, EventLoopTrait, HostTrait, StreamIdTrait};
use std::{cmp, ffi, ptr}; pub use self::enumerate::{default_input_device, default_output_device, Devices};
use std::sync::Mutex;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::vec::IntoIter as VecIntoIter;
pub type SupportedInputFormats = VecIntoIter<SupportedFormat>; pub type SupportedInputFormats = VecIntoIter<SupportedFormat>;
pub type SupportedOutputFormats = VecIntoIter<SupportedFormat>; pub type SupportedOutputFormats = VecIntoIter<SupportedFormat>;
@ -47,7 +45,6 @@ impl Host {
impl HostTrait for Host { impl HostTrait for Host {
type Devices = Devices; type Devices = Devices;
type Device = Device; type Device = Device;
type EventLoop = EventLoop;
fn is_available() -> bool { fn is_available() -> bool {
// Assume ALSA is always available on linux/freebsd. // Assume ALSA is always available on linux/freebsd.
@ -65,15 +62,12 @@ impl HostTrait for Host {
fn default_output_device(&self) -> Option<Self::Device> { fn default_output_device(&self) -> Option<Self::Device> {
default_output_device() default_output_device()
} }
fn event_loop(&self) -> Self::EventLoop {
EventLoop::new()
}
} }
impl DeviceTrait for Device { impl DeviceTrait for Device {
type SupportedInputFormats = SupportedInputFormats; type SupportedInputFormats = SupportedInputFormats;
type SupportedOutputFormats = SupportedOutputFormats; type SupportedOutputFormats = SupportedOutputFormats;
type Stream = Stream;
fn name(&self) -> Result<String, DeviceNameError> { fn name(&self) -> Result<String, DeviceNameError> {
Device::name(self) Device::name(self)
@ -94,95 +88,132 @@ impl DeviceTrait for Device {
fn default_output_format(&self) -> Result<Format, DefaultFormatError> { fn default_output_format(&self) -> Result<Format, DefaultFormatError> {
Device::default_output_format(self) Device::default_output_format(self)
} }
fn build_input_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> where F: FnMut(StreamDataResult) + Send + 'static {
Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_CAPTURE)?), callback))
} }
impl EventLoopTrait for EventLoop { fn build_output_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError> where F: FnMut(StreamDataResult) + Send + 'static {
type Device = Device; Ok(Stream::new(Arc::new(self.build_stream_inner(format, alsa::SND_PCM_STREAM_PLAYBACK)?), callback))
type StreamId = StreamId;
fn build_input_stream(
&self,
device: &Self::Device,
format: &Format,
) -> Result<Self::StreamId, BuildStreamError> {
EventLoop::build_input_stream(self, device, format)
}
fn build_output_stream(
&self,
device: &Self::Device,
format: &Format,
) -> Result<Self::StreamId, BuildStreamError> {
EventLoop::build_output_stream(self, device, format)
}
fn play_stream(&self, stream: Self::StreamId) -> Result<(), PlayStreamError> {
EventLoop::play_stream(self, stream)
}
fn pause_stream(&self, stream: Self::StreamId) -> Result<(), PauseStreamError> {
EventLoop::pause_stream(self, stream)
}
fn destroy_stream(&self, stream: Self::StreamId) {
EventLoop::destroy_stream(self, stream)
}
fn run<F>(&self, callback: F) -> !
where
F: FnMut(Self::StreamId, StreamDataResult) + Send,
{
EventLoop::run(self, callback)
} }
} }
impl StreamIdTrait for StreamId {}
struct Trigger { struct TriggerSender(libc::c_int);
// [read fd, write fd]
fds: [libc::c_int; 2],
}
impl Trigger { struct TriggerReceiver(libc::c_int);
fn new() -> Self {
let mut fds = [0, 0]; impl TriggerSender {
match unsafe { libc::pipe(fds.as_mut_ptr()) } {
0 => Trigger { fds: fds },
_ => panic!("Could not create pipe"),
}
}
fn read_fd(&self) -> libc::c_int {
self.fds[0]
}
fn write_fd(&self) -> libc::c_int {
self.fds[1]
}
fn wakeup(&self) { fn wakeup(&self) {
let buf = 1u64; let buf = 1u64;
let ret = unsafe { libc::write(self.write_fd(), &buf as *const u64 as *const _, 8) }; let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
assert!(ret == 8); assert!(ret == 8);
} }
}
impl TriggerReceiver {
fn clear_pipe(&self) { fn clear_pipe(&self) {
let mut out = 0u64; let mut out = 0u64;
let ret = unsafe { libc::read(self.read_fd(), &mut out as *mut u64 as *mut _, 8) }; let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
assert_eq!(ret, 8); assert_eq!(ret, 8);
} }
} }
impl Drop for Trigger { fn trigger() -> (TriggerSender, TriggerReceiver) {
let mut fds = [0, 0];
match unsafe { libc::pipe(fds.as_mut_ptr()) } {
0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
_ => panic!("Could not create pipe"),
}
}
impl Drop for TriggerSender {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {
libc::close(self.fds[0]); libc::close(self.0);
libc::close(self.fds[1]);
} }
} }
} }
impl Drop for TriggerReceiver {
fn drop(&mut self) {
unsafe {
libc::close(self.0);
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct Device(String); pub struct Device(String);
impl Device { impl Device {
fn build_stream_inner(&self, format: &Format, stream_type: alsa::snd_pcm_stream_t) -> Result<StreamInner, BuildStreamError> {
let name = ffi::CString::new(self.0.clone()).expect("unable to clone device");
let handle = unsafe {
let mut handle = ptr::null_mut();
match alsa::snd_pcm_open(
&mut handle,
name.as_ptr(),
stream_type,
alsa::SND_PCM_NONBLOCK,
) {
-16 /* determined empirically */ => return Err(BuildStreamError::DeviceNotAvailable),
-22 => return Err(BuildStreamError::InvalidArgument),
e => if let Err(description) = check_errors(e) {
let err = BackendSpecificError { description };
return Err(err.into());
}
}
handle
};
let can_pause = unsafe {
let hw_params = HwParams::alloc();
set_hw_params_from_format(handle, &hw_params, format)
.map_err(|description| BackendSpecificError { description })?;
alsa::snd_pcm_hw_params_can_pause(hw_params.0) == 1
};
let (buffer_len, period_len) = unsafe {
set_sw_params_from_format(handle, format)
.map_err(|description| BackendSpecificError { description })?
};
if let Err(desc) = check_errors(unsafe { alsa::snd_pcm_prepare(handle) }) {
let description = format!("could not get handle: {}", desc);
let err = BackendSpecificError { description };
return Err(err.into());
}
let num_descriptors = {
let num_descriptors = unsafe { alsa::snd_pcm_poll_descriptors_count(handle) };
if num_descriptors == 0 {
let description = "poll descriptor count for stream was 0".to_string();
let err = BackendSpecificError { description };
return Err(err.into());
}
num_descriptors as usize
};
let stream_inner = StreamInner {
channel: handle,
sample_format: format.data_type,
num_descriptors,
num_channels: format.channels as u16,
buffer_len,
period_len,
can_pause,
};
if let Err(desc) = check_errors(unsafe { alsa::snd_pcm_start(handle) }) {
let description = format!("could not start stream: {}", desc);
let err = BackendSpecificError { description };
return Err(err.into());
}
Ok(stream_inner)
}
#[inline] #[inline]
fn name(&self) -> Result<String, DeviceNameError> { fn name(&self) -> Result<String, DeviceNameError> {
Ok(self.0.clone()) Ok(self.0.clone())
@ -450,48 +481,7 @@ impl Device {
} }
} }
pub struct EventLoop {
// Each newly-created stream gets a new ID from this counter. The counter is then incremented.
next_stream_id: AtomicUsize, // TODO: use AtomicU64 when stable?
// A trigger that uses a `pipe()` as backend. Signalled whenever a new command is ready, so
// that `poll()` can wake up and pick the changes.
pending_command_trigger: Trigger,
// This field is locked by the `run()` method.
// The mutex also ensures that only one thread at a time has `run()` running.
run_context: Mutex<RunContext>,
// Commands processed by the `run()` method that is currently running.
commands: Sender<Command>,
}
unsafe impl Send for EventLoop {
}
unsafe impl Sync for EventLoop {
}
enum Command {
NewStream(StreamInner),
PlayStream(StreamId),
PauseStream(StreamId),
DestroyStream(StreamId),
}
struct RunContext {
// Descriptors to wait for. Always contains `pending_command_trigger.read_fd()` as first element.
descriptors: Vec<libc::pollfd>,
// List of streams that are written in `descriptors`.
streams: Vec<StreamInner>,
commands: Receiver<Command>,
}
struct StreamInner { struct StreamInner {
// The id of the stream.
id: StreamId,
// The ALSA channel. // The ALSA channel.
channel: *mut alsa::snd_pcm_t, channel: *mut alsa::snd_pcm_t,
@ -513,209 +503,174 @@ struct StreamInner {
// Whether or not the hardware supports pausing the stream. // Whether or not the hardware supports pausing the stream.
can_pause: bool, can_pause: bool,
// Whether or not the sample stream is currently paused.
is_paused: bool,
// A file descriptor opened with `eventfd`.
// It is used to wait for resume signal.
resume_trigger: Trigger,
// Lazily allocated buffer that is reused inside the loop.
// Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is
// used.
buffer: Vec<u8>,
} }
#[derive(Copy, Debug, Clone, PartialEq, Eq, Hash)] // Assume that the ALSA library is built with thread safe option.
pub struct StreamId(usize); unsafe impl Send for StreamInner {}
unsafe impl Sync for StreamInner {}
enum StreamType { Input, Output } enum StreamType { Input, Output }
pub struct Stream {
/// The high-priority audio processing thread calling callbacks.
/// Option used for moving out in destructor.
thread: Option<JoinHandle<()>>,
impl EventLoop { /// Handle to the underlying stream for playback controls.
#[inline] inner: Arc<StreamInner>,
fn new() -> EventLoop {
let pending_command_trigger = Trigger::new();
let mut initial_descriptors = vec![]; /// Used to signal to stop processing.
reset_descriptors_with_pending_command_trigger( trigger: TriggerSender,
&mut initial_descriptors, }
&pending_command_trigger,
);
let (tx, rx) = channel(); /// The inner body of the audio processing thread. Takes the polymorphic
/// callback to avoid generating too much generic code.
let run_context = Mutex::new(RunContext { fn stream_worker(rx: TriggerReceiver, stream: &StreamInner, callback: &mut (dyn FnMut(StreamDataResult) + Send + 'static)) {
descriptors: initial_descriptors, let mut descriptors = Vec::new();
streams: Vec::new(), let mut buffer = Vec::new();
commands: rx, loop {
descriptors.clear();
// Add the self-pipe for signaling termination.
descriptors.push(libc::pollfd {
fd: rx.0,
events: libc::POLLIN,
revents: 0,
}); });
EventLoop { // Add ALSA polling fds.
next_stream_id: AtomicUsize::new(0), descriptors.reserve(stream.num_descriptors);
pending_command_trigger: pending_command_trigger, let len = descriptors.len();
run_context, let filled = unsafe {
commands: tx, alsa::snd_pcm_poll_descriptors(
} stream.channel,
} descriptors[len..].as_mut_ptr(),
stream.num_descriptors as libc::c_uint,
#[inline] )
fn run<F>(&self, mut callback: F) -> ! };
where F: FnMut(StreamId, StreamDataResult) debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
{
self.run_inner(&mut callback)
}
fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamDataResult)) -> ! {
unsafe { unsafe {
let mut run_context = self.run_context.lock().unwrap(); descriptors.set_len(len + stream.num_descriptors);
let run_context = &mut *run_context;
'stream_loop: loop {
process_commands(run_context);
reset_descriptors_with_pending_command_trigger(
&mut run_context.descriptors,
&self.pending_command_trigger,
);
append_stream_poll_descriptors(run_context);
// At this point, this should include the command `pending_commands_trigger` along
// with the poll descriptors for each stream.
match poll_all_descriptors(&mut run_context.descriptors) {
Ok(true) => (),
Ok(false) => continue,
Err(err) => {
for stream in run_context.streams.iter() {
let result = Err(err.clone().into());
callback(stream.id, result);
}
run_context.streams.clear();
break 'stream_loop;
}
} }
// If the `pending_command_trigger` was signaled, we need to process the comands. let res = unsafe {
if run_context.descriptors[0].revents != 0 { // Don't timeout, wait forever.
run_context.descriptors[0].revents = 0; libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1)
self.pending_command_trigger.clear_pipe(); };
if res < 0 {
let description = format!("`libc::poll()` failed: {}", io::Error::last_os_error());
callback(Err(BackendSpecificError { description }.into()));
continue;
} else if res == 0 {
let description = String::from("`libc::poll()` spuriously returned");
callback(Err(BackendSpecificError { description }.into()));
continue;
} }
// The set of streams that error within the following loop and should be removed. if descriptors[0].revents != 0 {
let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; // The stream has been requested to be destroyed.
rx.clear_pipe();
return;
}
// Iterate over each individual stream/descriptor. let stream_type = match check_for_pollout_or_pollin(stream, descriptors[1..].as_mut_ptr()) {
let mut i_stream = 0;
let mut i_descriptor = 1;
while (i_descriptor as usize) < run_context.descriptors.len() {
let stream = &mut run_context.streams[i_stream];
let stream_descriptor_ptr = run_context.descriptors.as_mut_ptr().offset(i_descriptor);
// Only go on if this event was a pollout or pollin event.
let stream_type = match check_for_pollout_or_pollin(stream, stream_descriptor_ptr) {
Ok(Some(ty)) => ty, Ok(Some(ty)) => ty,
Ok(None) => { Ok(None) => {
i_descriptor += stream.num_descriptors as isize; // Nothing to process, poll again
i_stream += 1;
continue; continue;
}, },
Err(err) => { Err(err) => {
streams_to_remove.push((stream.id, err.into())); // TODO: signal errors
i_descriptor += stream.num_descriptors as isize;
i_stream += 1;
continue; continue;
} }
}; };
// Get the number of available samples for reading/writing. // Get the number of available samples for reading/writing.
let available_samples = match get_available_samples(stream) { let available_samples = match get_available_samples(stream) {
Ok(n) => n, Ok(n) => n,
Err(err) => { Err(err) => {
streams_to_remove.push((stream.id, err.into())); let description = format!("Failed to query the number of available samples: {}", err);
i_descriptor += stream.num_descriptors as isize; callback(Err(BackendSpecificError { description }.into()));
i_stream += 1;
continue; continue;
} }
}; };
// Only go on if there is at least `stream.period_len` samples. // Only go on if there is at least `stream.period_len` samples.
if available_samples < stream.period_len { if available_samples < stream.period_len {
i_descriptor += stream.num_descriptors as isize;
i_stream += 1;
continue; continue;
} }
// Prepare the data buffer. // Prepare the data buffer.
let buffer_size = stream.sample_format.sample_size() * available_samples; let buffer_size = stream.sample_format.sample_size() * available_samples;
stream.buffer.resize(buffer_size, 0u8); buffer.resize(buffer_size, 0u8);
let available_frames = available_samples / stream.num_channels as usize; let available_frames = available_samples / stream.num_channels as usize;
match stream_type { match stream_type {
StreamType::Input => { StreamType::Input => {
let result = alsa::snd_pcm_readi( let result = unsafe {
alsa::snd_pcm_readi(
stream.channel, stream.channel,
stream.buffer.as_mut_ptr() as *mut _, buffer.as_mut_ptr() as *mut _,
available_frames as alsa::snd_pcm_uframes_t, available_frames as alsa::snd_pcm_uframes_t,
); )
};
if let Err(err) = check_errors(result as _) { if let Err(err) = check_errors(result as _) {
let description = format!("`snd_pcm_readi` failed: {}", err); let description = format!("`snd_pcm_readi` failed: {}", err);
let err = BackendSpecificError { description }; callback(Err(BackendSpecificError { description }.into()));
streams_to_remove.push((stream.id, err.into()));
continue; continue;
} }
let input_buffer = match stream.sample_format { let input_buffer = match stream.sample_format {
SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer { SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer {
buffer: cast_input_buffer(&mut stream.buffer), buffer: unsafe { cast_input_buffer(&mut buffer) },
}), }),
SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer { SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer {
buffer: cast_input_buffer(&mut stream.buffer), buffer: unsafe { cast_input_buffer(&mut buffer) },
}), }),
SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer { SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer {
buffer: cast_input_buffer(&mut stream.buffer), buffer: unsafe { cast_input_buffer(&mut buffer) },
}), }),
}; };
let stream_data = StreamData::Input { let stream_data = StreamData::Input {
buffer: input_buffer, buffer: input_buffer,
}; };
callback(stream.id, Ok(stream_data)); callback(Ok(stream_data));
}, },
StreamType::Output => { StreamType::Output => {
{ {
// We're now sure that we're ready to write data. // We're now sure that we're ready to write data.
let output_buffer = match stream.sample_format { let output_buffer = match stream.sample_format {
SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer { SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer {
buffer: cast_output_buffer(&mut stream.buffer), buffer: unsafe { cast_output_buffer(&mut buffer) },
}), }),
SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer { SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer {
buffer: cast_output_buffer(&mut stream.buffer), buffer: unsafe { cast_output_buffer(&mut buffer) },
}), }),
SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer { SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer {
buffer: cast_output_buffer(&mut stream.buffer), buffer: unsafe { cast_output_buffer(&mut buffer) },
}), }),
}; };
let stream_data = StreamData::Output { let stream_data = StreamData::Output {
buffer: output_buffer, buffer: output_buffer,
}; };
callback(stream.id, Ok(stream_data)); callback(Ok(stream_data));
} }
loop { loop {
let result = alsa::snd_pcm_writei( let result = unsafe {
alsa::snd_pcm_writei(
stream.channel, stream.channel,
stream.buffer.as_ptr() as *const _, buffer.as_ptr() as *const _,
available_frames as alsa::snd_pcm_uframes_t, available_frames as alsa::snd_pcm_uframes_t,
); )
};
if result as i32 == -libc::EPIPE { if result == -libc::EPIPE as i64 {
// buffer underrun // buffer underrun
// TODO: Notify the user of this. // TODO: Notify the user of this.
alsa::snd_pcm_recover(stream.channel, result as i32, 0); unsafe { alsa::snd_pcm_recover(stream.channel, result as i32, 0) };
} else if let Err(err) = check_errors(result as _) { } else if let Err(err) = check_errors(result as _) {
let description = format!("`snd_pcm_writei` failed: {}", err); let description = format!("`snd_pcm_writei` failed: {}", err);
let err = BackendSpecificError { description }; callback(Err(BackendSpecificError { description }.into()));
streams_to_remove.push((stream.id, err.into()));
continue; continue;
} else if result as usize != available_frames { } else if result as usize != available_frames {
let description = format!( let description = format!(
@ -724,8 +679,7 @@ impl EventLoop {
available_frames, available_frames,
result, result,
); );
let err = BackendSpecificError { description }; callback(Err(BackendSpecificError { description }.into()));
streams_to_remove.push((stream.id, err.into()));
continue; continue;
} else { } else {
break; break;
@ -734,280 +688,45 @@ impl EventLoop {
}, },
} }
} }
// Remove any streams that have errored and notify the user.
for (stream_id, err) in streams_to_remove {
run_context.streams.retain(|s| s.id != stream_id);
callback(stream_id, Err(err.into()));
}
}
} }
panic!("`cpal::EventLoop::run` API currently disallows returning"); impl Stream {
} fn new<F>(inner: Arc<StreamInner>, mut callback: F) -> Stream where F: FnMut(StreamDataResult) + Send + 'static {
let (tx, rx) = trigger();
fn build_input_stream( // Clone the handle for passing into worker thread.
&self, let stream = inner.clone();
device: &Device, let thread = thread::spawn(move || {
format: &Format, stream_worker(rx, &*stream, &mut callback);
) -> Result<StreamId, BuildStreamError>
{
unsafe {
let name = ffi::CString::new(device.0.clone()).expect("unable to clone device");
let mut capture_handle = ptr::null_mut();
match alsa::snd_pcm_open(
&mut capture_handle,
name.as_ptr(),
alsa::SND_PCM_STREAM_CAPTURE,
alsa::SND_PCM_NONBLOCK,
) {
-16 /* determined empirically */ => return Err(BuildStreamError::DeviceNotAvailable),
-22 => return Err(BuildStreamError::InvalidArgument),
e => if let Err(description) = check_errors(e) {
let err = BackendSpecificError { description };
return Err(err.into());
}
}
let hw_params = HwParams::alloc();
set_hw_params_from_format(capture_handle, &hw_params, format)
.map_err(|description| BackendSpecificError { description })?;
let can_pause = alsa::snd_pcm_hw_params_can_pause(hw_params.0) == 1;
let (buffer_len, period_len) = set_sw_params_from_format(capture_handle, format)
.map_err(|description| BackendSpecificError { description })?;
if let Err(desc) = check_errors(alsa::snd_pcm_prepare(capture_handle)) {
let description = format!("could not get capture handle: {}", desc);
let err = BackendSpecificError { description };
return Err(err.into());
}
let num_descriptors = {
let num_descriptors = alsa::snd_pcm_poll_descriptors_count(capture_handle);
if num_descriptors == 0 {
let description = "poll descriptor count for capture stream was 0".to_string();
let err = BackendSpecificError { description };
return Err(err.into());
}
num_descriptors as usize
};
let new_stream_id = StreamId(self.next_stream_id.fetch_add(1, Ordering::Relaxed));
if new_stream_id.0 == usize::max_value() {
panic!("number of streams used has overflowed usize");
}
let stream_inner = StreamInner {
id: new_stream_id.clone(),
channel: capture_handle,
sample_format: format.data_type,
num_descriptors: num_descriptors,
num_channels: format.channels as u16,
buffer_len: buffer_len,
period_len: period_len,
can_pause: can_pause,
is_paused: false,
resume_trigger: Trigger::new(),
buffer: vec![],
};
if let Err(desc) = check_errors(alsa::snd_pcm_start(capture_handle)) {
let description = format!("could not start capture stream: {}", desc);
let err = BackendSpecificError { description };
return Err(err.into());
}
self.push_command(Command::NewStream(stream_inner));
Ok(new_stream_id)
}
}
fn build_output_stream(
&self,
device: &Device,
format: &Format,
) -> Result<StreamId, BuildStreamError>
{
unsafe {
let name = ffi::CString::new(device.0.clone()).expect("unable to clone device");
let mut playback_handle = ptr::null_mut();
match alsa::snd_pcm_open(
&mut playback_handle,
name.as_ptr(),
alsa::SND_PCM_STREAM_PLAYBACK,
alsa::SND_PCM_NONBLOCK,
) {
-16 /* determined empirically */ => return Err(BuildStreamError::DeviceNotAvailable),
-22 => return Err(BuildStreamError::InvalidArgument),
e => if let Err(description) = check_errors(e) {
let err = BackendSpecificError { description };
return Err(err.into())
}
}
let hw_params = HwParams::alloc();
set_hw_params_from_format(playback_handle, &hw_params, format)
.map_err(|description| BackendSpecificError { description })?;
let can_pause = alsa::snd_pcm_hw_params_can_pause(hw_params.0) == 1;
let (buffer_len, period_len) = set_sw_params_from_format(playback_handle, format)
.map_err(|description| BackendSpecificError { description })?;
if let Err(desc) = check_errors(alsa::snd_pcm_prepare(playback_handle)) {
let description = format!("could not get playback handle: {}", desc);
let err = BackendSpecificError { description };
return Err(err.into());
}
let num_descriptors = {
let num_descriptors = alsa::snd_pcm_poll_descriptors_count(playback_handle);
if num_descriptors == 0 {
let description = "poll descriptor count for playback stream was 0".to_string();
let err = BackendSpecificError { description };
return Err(err.into());
}
num_descriptors as usize
};
let new_stream_id = StreamId(self.next_stream_id.fetch_add(1, Ordering::Relaxed));
if new_stream_id.0 == usize::max_value() {
return Err(BuildStreamError::StreamIdOverflow);
}
let stream_inner = StreamInner {
id: new_stream_id.clone(),
channel: playback_handle,
sample_format: format.data_type,
num_descriptors: num_descriptors,
num_channels: format.channels as u16,
buffer_len: buffer_len,
period_len: period_len,
can_pause: can_pause,
is_paused: false,
resume_trigger: Trigger::new(),
buffer: vec![],
};
self.push_command(Command::NewStream(stream_inner));
Ok(new_stream_id)
}
}
#[inline]
fn push_command(&self, command: Command) {
// Safe to unwrap: sender outlives receiver.
self.commands.send(command).unwrap();
self.pending_command_trigger.wakeup();
}
#[inline]
fn destroy_stream(&self, stream_id: StreamId) {
self.push_command(Command::DestroyStream(stream_id));
}
#[inline]
fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> {
self.push_command(Command::PlayStream(stream_id));
Ok(())
}
#[inline]
fn pause_stream(&self, stream_id: StreamId) -> Result<(), PauseStreamError> {
self.push_command(Command::PauseStream(stream_id));
Ok(())
}
}
// Process any pending `Command`s within the `RunContext`'s queue.
fn process_commands(run_context: &mut RunContext) {
for command in run_context.commands.try_iter() {
match command {
Command::DestroyStream(stream_id) => {
run_context.streams.retain(|s| s.id != stream_id);
},
Command::PlayStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
unsafe {
alsa::snd_pcm_pause(stream.channel, 0);
}
stream.is_paused = false;
}
},
Command::PauseStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
unsafe {
alsa::snd_pcm_pause(stream.channel, 1);
}
stream.is_paused = true;
}
},
Command::NewStream(stream_inner) => {
run_context.streams.push(stream_inner);
},
}
}
}
// Resets the descriptors so that only `pending_command_trigger.read_fd()` is contained.
fn reset_descriptors_with_pending_command_trigger(
descriptors: &mut Vec<libc::pollfd>,
pending_command_trigger: &Trigger,
) {
descriptors.clear();
descriptors.push(libc::pollfd {
fd: pending_command_trigger.read_fd(),
events: libc::POLLIN,
revents: 0,
}); });
Stream {
thread: Some(thread),
inner,
trigger: tx,
}
}
} }
// Appends the `poll` descriptors for each stream onto the `RunContext`'s descriptor slice, ready impl Drop for Stream {
// for a call to `libc::poll`. fn drop(&mut self) {
fn append_stream_poll_descriptors(run_context: &mut RunContext) { self.trigger.wakeup();
for stream in run_context.streams.iter() { self.thread.take().unwrap().join().unwrap();
run_context.descriptors.reserve(stream.num_descriptors); }
let len = run_context.descriptors.len(); }
let filled = unsafe {
alsa::snd_pcm_poll_descriptors( impl StreamTrait for Stream {
stream.channel, fn play(&self) -> Result<(), PlayStreamError> {
run_context.descriptors.as_mut_ptr().offset(len as isize),
stream.num_descriptors as libc::c_uint,
)
};
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
unsafe { unsafe {
run_context.descriptors.set_len(len + stream.num_descriptors); alsa::snd_pcm_pause(self.inner.channel, 0);
} }
// TODO: error handling
Ok(())
} }
fn pause(&self)-> Result<(), PauseStreamError> {
unsafe {
alsa::snd_pcm_pause(self.inner.channel, 1);
} }
// TODO: error handling
// Poll all descriptors within the given set. Ok(())
//
// Returns `Ok(true)` if some event has occurred or `Ok(false)` if no events have
// occurred.
//
// Returns an `Err` if `libc::poll` returns a negative value for some reason.
fn poll_all_descriptors(descriptors: &mut [libc::pollfd]) -> Result<bool, BackendSpecificError> {
let res = unsafe {
// Don't timeout, wait forever.
libc::poll(descriptors.as_mut_ptr(), descriptors.len() as libc::nfds_t, -1)
};
if res < 0 {
let description = format!("`libc::poll()` failed: {}", res);
Err(BackendSpecificError { description })
} else if res == 0 {
Ok(false)
} else {
Ok(true)
} }
} }

View File

@ -10,7 +10,7 @@ use PlayStreamError;
use StreamDataResult; use StreamDataResult;
use SupportedFormatsError; use SupportedFormatsError;
use SupportedFormat; use SupportedFormat;
use traits::{DeviceTrait, EventLoopTrait, HostTrait, StreamIdTrait}; use traits::{DeviceTrait, HostTrait, StreamTrait};
#[derive(Default)] #[derive(Default)]
pub struct Devices; pub struct Devices;
@ -23,7 +23,7 @@ pub struct EventLoop;
pub struct Host; pub struct Host;
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamId; pub struct Stream;
pub struct SupportedInputFormats; pub struct SupportedInputFormats;
pub struct SupportedOutputFormats; pub struct SupportedOutputFormats;
@ -49,6 +49,7 @@ impl EventLoop {
impl DeviceTrait for Device { impl DeviceTrait for Device {
type SupportedInputFormats = SupportedInputFormats; type SupportedInputFormats = SupportedInputFormats;
type SupportedOutputFormats = SupportedOutputFormats; type SupportedOutputFormats = SupportedOutputFormats;
type Stream = Stream;
#[inline] #[inline]
fn name(&self) -> Result<String, DeviceNameError> { fn name(&self) -> Result<String, DeviceNameError> {
@ -74,49 +75,22 @@ impl DeviceTrait for Device {
fn default_output_format(&self) -> Result<Format, DefaultFormatError> { fn default_output_format(&self) -> Result<Format, DefaultFormatError> {
unimplemented!() unimplemented!()
} }
}
impl EventLoopTrait for EventLoop { fn build_input_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError>
type Device = Device; where F: FnMut(StreamDataResult) + Send + 'static {
type StreamId = StreamId;
#[inline]
fn run<F>(&self, _callback: F) -> !
where F: FnMut(StreamId, StreamDataResult)
{
loop { /* TODO: don't spin */ }
}
#[inline]
fn build_input_stream(&self, _: &Device, _: &Format) -> Result<StreamId, BuildStreamError> {
Err(BuildStreamError::DeviceNotAvailable)
}
#[inline]
fn build_output_stream(&self, _: &Device, _: &Format) -> Result<StreamId, BuildStreamError> {
Err(BuildStreamError::DeviceNotAvailable)
}
#[inline]
fn destroy_stream(&self, _: StreamId) {
unimplemented!() unimplemented!()
} }
#[inline] /// Create an output stream.
fn play_stream(&self, _: StreamId) -> Result<(), PlayStreamError> { fn build_output_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError>
panic!() where F: FnMut(StreamDataResult) + Send + 'static{
} unimplemented!()
#[inline]
fn pause_stream(&self, _: StreamId) -> Result<(), PauseStreamError> {
panic!()
} }
} }
impl HostTrait for Host { impl HostTrait for Host {
type Device = Device; type Device = Device;
type Devices = Devices; type Devices = Devices;
type EventLoop = EventLoop;
fn is_available() -> bool { fn is_available() -> bool {
false false
@ -133,13 +107,17 @@ impl HostTrait for Host {
fn default_output_device(&self) -> Option<Device> { fn default_output_device(&self) -> Option<Device> {
None None
} }
fn event_loop(&self) -> Self::EventLoop {
EventLoop::new()
}
} }
impl StreamIdTrait for StreamId {} impl StreamTrait for Stream {
fn play(&self) -> Result<(), PlayStreamError> {
unimplemented!()
}
fn pause(&self) -> Result<(), PauseStreamError> {
unimplemented!()
}
}
impl Iterator for Devices { impl Iterator for Devices {
type Item = Device; type Item = Device;

View File

@ -151,11 +151,10 @@ extern crate thiserror;
pub use error::*; pub use error::*;
pub use platform::{ pub use platform::{
ALL_HOSTS, Device, Devices, EventLoop, Host, HostId, SupportedInputFormats, ALL_HOSTS, available_hosts, default_host, Device, Devices, Host, host_from_id,
SupportedOutputFormats, StreamId, available_hosts, default_host, host_from_id, HostId, Stream, SupportedInputFormats, SupportedOutputFormats,
}; };
pub use samples_formats::{Sample, SampleFormat}; pub use samples_formats::{Sample, SampleFormat};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
mod error; mod error;

View File

@ -58,14 +58,9 @@ macro_rules! impl_platform_host {
/// type. /// type.
pub struct Devices(DevicesInner); pub struct Devices(DevicesInner);
/// The **EventLoop** implementation associated with the platform's dynamically dispatched /// The **Stream** implementation associated with the platform's dynamically dispatched
/// **Host** type. /// **Host** type.
pub struct EventLoop(EventLoopInner); pub struct Stream(StreamInner);
/// The **StreamId** implementation associated with the platform's dynamically dispatched
/// **Host** type.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StreamId(StreamIdInner);
/// The **SupportedInputFormats** iterator associated with the platform's dynamically /// The **SupportedInputFormats** iterator associated with the platform's dynamically
/// dispatched **Host** type. /// dispatched **Host** type.
@ -95,22 +90,15 @@ macro_rules! impl_platform_host {
)* )*
} }
enum EventLoopInner {
$(
$HostVariant(crate::host::$host_mod::EventLoop),
)*
}
enum HostInner { enum HostInner {
$( $(
$HostVariant(crate::host::$host_mod::Host), $HostVariant(crate::host::$host_mod::Host),
)* )*
} }
#[derive(Clone, Debug, Eq, Hash, PartialEq)] enum StreamInner {
enum StreamIdInner {
$( $(
$HostVariant(crate::host::$host_mod::StreamId), $HostVariant(crate::host::$host_mod::Stream),
)* )*
} }
@ -212,6 +200,7 @@ macro_rules! impl_platform_host {
impl crate::traits::DeviceTrait for Device { impl crate::traits::DeviceTrait for Device {
type SupportedInputFormats = SupportedInputFormats; type SupportedInputFormats = SupportedInputFormats;
type SupportedOutputFormats = SupportedOutputFormats; type SupportedOutputFormats = SupportedOutputFormats;
type Stream = Stream;
fn name(&self) -> Result<String, crate::DeviceNameError> { fn name(&self) -> Result<String, crate::DeviceNameError> {
match self.0 { match self.0 {
@ -260,96 +249,25 @@ macro_rules! impl_platform_host {
)* )*
} }
} }
}
impl crate::traits::EventLoopTrait for EventLoop { fn build_input_stream<F>(&self, format: &crate::Format, callback: F) -> Result<Self::Stream, crate::BuildStreamError>
type StreamId = StreamId; where F: FnMut(crate::StreamDataResult) + Send + 'static {
type Device = Device;
#[allow(unreachable_patterns)]
fn build_input_stream(
&self,
device: &Self::Device,
format: &crate::Format,
) -> Result<Self::StreamId, crate::BuildStreamError> {
match (&self.0, &device.0) {
$(
(&EventLoopInner::$HostVariant(ref e), &DeviceInner::$HostVariant(ref d)) => {
e.build_input_stream(d, format)
.map(StreamIdInner::$HostVariant)
.map(StreamId)
}
)*
_ => panic!("tried to build a stream with a device from another host"),
}
}
#[allow(unreachable_patterns)]
fn build_output_stream(
&self,
device: &Self::Device,
format: &crate::Format,
) -> Result<Self::StreamId, crate::BuildStreamError> {
match (&self.0, &device.0) {
$(
(&EventLoopInner::$HostVariant(ref e), &DeviceInner::$HostVariant(ref d)) => {
e.build_output_stream(d, format)
.map(StreamIdInner::$HostVariant)
.map(StreamId)
}
)*
_ => panic!("tried to build a stream with a device from another host"),
}
}
#[allow(unreachable_patterns)]
fn play_stream(&self, stream: Self::StreamId) -> Result<(), crate::PlayStreamError> {
match (&self.0, stream.0) {
$(
(&EventLoopInner::$HostVariant(ref e), StreamIdInner::$HostVariant(ref s)) => {
e.play_stream(s.clone())
}
)*
_ => panic!("tried to play a stream with an ID associated with another host"),
}
}
#[allow(unreachable_patterns)]
fn pause_stream(&self, stream: Self::StreamId) -> Result<(), crate::PauseStreamError> {
match (&self.0, stream.0) {
$(
(&EventLoopInner::$HostVariant(ref e), StreamIdInner::$HostVariant(ref s)) => {
e.pause_stream(s.clone())
}
)*
_ => panic!("tried to pause a stream with an ID associated with another host"),
}
}
#[allow(unreachable_patterns)]
fn destroy_stream(&self, stream: Self::StreamId) {
match (&self.0, stream.0) {
$(
(&EventLoopInner::$HostVariant(ref e), StreamIdInner::$HostVariant(ref s)) => {
e.destroy_stream(s.clone())
}
)*
_ => panic!("tried to destroy a stream with an ID associated with another host"),
}
}
fn run<F>(&self, mut callback: F) -> !
where
F: FnMut(Self::StreamId, crate::StreamDataResult) + Send
{
match self.0 { match self.0 {
$( $(
EventLoopInner::$HostVariant(ref e) => { DeviceInner::$HostVariant(ref d) => d.build_input_stream(format, callback)
e.run(|id, result| { .map(StreamInner::$HostVariant)
let result = result; .map(Stream),
callback(StreamId(StreamIdInner::$HostVariant(id)), result); )*
}); }
}, }
fn build_output_stream<F>(&self, format: &crate::Format, callback: F) -> Result<Self::Stream, crate::BuildStreamError>
where F: FnMut(crate::StreamDataResult) + Send + 'static {
match self.0 {
$(
DeviceInner::$HostVariant(ref d) => d.build_output_stream(format, callback)
.map(StreamInner::$HostVariant)
.map(Stream),
)* )*
} }
} }
@ -358,7 +276,6 @@ macro_rules! impl_platform_host {
impl crate::traits::HostTrait for Host { impl crate::traits::HostTrait for Host {
type Devices = Devices; type Devices = Devices;
type Device = Device; type Device = Device;
type EventLoop = EventLoop;
fn is_available() -> bool { fn is_available() -> bool {
$( crate::host::$host_mod::Host::is_available() ||)* false $( crate::host::$host_mod::Host::is_available() ||)* false
@ -393,20 +310,30 @@ macro_rules! impl_platform_host {
)* )*
} }
} }
}
fn event_loop(&self) -> Self::EventLoop { impl crate::traits::StreamTrait for Stream {
fn play(&self) -> Result<(), crate::PlayStreamError> {
match self.0 { match self.0 {
$( $(
HostInner::$HostVariant(ref h) => { StreamInner::$HostVariant(ref s) => {
EventLoop(EventLoopInner::$HostVariant(h.event_loop())) s.play()
}
)*
}
}
fn pause(&self) -> Result<(), crate::PauseStreamError> {
match self.0 {
$(
StreamInner::$HostVariant(ref s) => {
s.pause()
} }
)* )*
} }
} }
} }
impl crate::traits::StreamIdTrait for StreamId {}
$( $(
impl From<crate::host::$host_mod::Device> for Device { impl From<crate::host::$host_mod::Device> for Device {
fn from(h: crate::host::$host_mod::Device) -> Self { fn from(h: crate::host::$host_mod::Device) -> Self {
@ -420,21 +347,15 @@ macro_rules! impl_platform_host {
} }
} }
impl From<crate::host::$host_mod::EventLoop> for EventLoop {
fn from(h: crate::host::$host_mod::EventLoop) -> Self {
EventLoop(EventLoopInner::$HostVariant(h))
}
}
impl From<crate::host::$host_mod::Host> for Host { impl From<crate::host::$host_mod::Host> for Host {
fn from(h: crate::host::$host_mod::Host) -> Self { fn from(h: crate::host::$host_mod::Host) -> Self {
Host(HostInner::$HostVariant(h)) Host(HostInner::$HostVariant(h))
} }
} }
impl From<crate::host::$host_mod::StreamId> for StreamId { impl From<crate::host::$host_mod::Stream> for Stream {
fn from(h: crate::host::$host_mod::StreamId) -> Self { fn from(h: crate::host::$host_mod::Stream) -> Self {
StreamId(StreamIdInner::$HostVariant(h)) Stream(StreamInner::$HostVariant(h))
} }
} }
)* )*
@ -471,9 +392,8 @@ mod platform_impl {
pub use crate::host::alsa::{ pub use crate::host::alsa::{
Device as AlsaDevice, Device as AlsaDevice,
Devices as AlsaDevices, Devices as AlsaDevices,
EventLoop as AlsaEventLoop,
Host as AlsaHost, Host as AlsaHost,
StreamId as AlsaStreamId, Stream as AlsaStream,
SupportedInputFormats as AlsaSupportedInputFormats, SupportedInputFormats as AlsaSupportedInputFormats,
SupportedOutputFormats as AlsaSupportedOutputFormats, SupportedOutputFormats as AlsaSupportedOutputFormats,
}; };

View File

@ -39,8 +39,6 @@ pub trait HostTrait {
type Devices: Iterator<Item = Self::Device>; type Devices: Iterator<Item = Self::Device>;
/// The `Device` type yielded by the host. /// The `Device` type yielded by the host.
type Device: DeviceTrait; type Device: DeviceTrait;
/// The event loop type used by the `Host`
type EventLoop: EventLoopTrait<Device = Self::Device>;
/// Whether or not the host is available on the system. /// Whether or not the host is available on the system.
fn is_available() -> bool; fn is_available() -> bool;
@ -60,9 +58,6 @@ pub trait HostTrait {
/// Returns `None` if no output device is available. /// Returns `None` if no output device is available.
fn default_output_device(&self) -> Option<Self::Device>; fn default_output_device(&self) -> Option<Self::Device>;
/// Initialise the event loop, ready for managing audio streams.
fn event_loop(&self) -> Self::EventLoop;
/// An iterator yielding all `Device`s currently available to the system that support one or more /// An iterator yielding all `Device`s currently available to the system that support one or more
/// input stream formats. /// input stream formats.
/// ///
@ -99,6 +94,8 @@ pub trait DeviceTrait {
type SupportedInputFormats: Iterator<Item = SupportedFormat>; type SupportedInputFormats: Iterator<Item = SupportedFormat>;
/// The iterator type yielding supported output stream formats. /// The iterator type yielding supported output stream formats.
type SupportedOutputFormats: Iterator<Item = SupportedFormat>; type SupportedOutputFormats: Iterator<Item = SupportedFormat>;
/// The stream type created by `build_input_stream` and `build_output_stream`.
type Stream: StreamTrait;
/// The human-readable name of the device. /// The human-readable name of the device.
fn name(&self) -> Result<String, DeviceNameError>; fn name(&self) -> Result<String, DeviceNameError>;
@ -118,81 +115,19 @@ pub trait DeviceTrait {
/// The default output stream format for the device. /// The default output stream format for the device.
fn default_output_format(&self) -> Result<Format, DefaultFormatError>; fn default_output_format(&self) -> Result<Format, DefaultFormatError>;
/// Create an input stream.
fn build_input_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError>
where F: FnMut(StreamDataResult) + Send + 'static;
/// Create an output stream.
fn build_output_stream<F>(&self, format: &Format, callback: F) -> Result<Self::Stream, BuildStreamError>
where F: FnMut(StreamDataResult) + Send + 'static;
} }
/// Collection of streams managed together. /// A stream created from `Device`, with methods to control playback.
/// pub trait StreamTrait {
/// Created with the `Host::event_loop` method. fn play(&self) -> Result<(), PlayStreamError>;
pub trait EventLoopTrait {
/// The `Device` type yielded by the host.
type Device: DeviceTrait;
/// The type used to uniquely distinguish between streams.
type StreamId: StreamIdTrait;
/// Creates a new input stream that will run from the given device and with the given format. fn pause(&self) -> Result<(), PauseStreamError>;
///
/// On success, returns an identifier for the stream.
///
/// Can return an error if the device is no longer valid, or if the input stream format is not
/// supported by the device.
fn build_input_stream(
&self,
device: &Self::Device,
format: &Format,
) -> Result<Self::StreamId, BuildStreamError>;
/// Creates a new output stream that will play on the given device and with the given format.
///
/// On success, returns an identifier for the stream.
///
/// Can return an error if the device is no longer valid, or if the output stream format is not
/// supported by the device.
fn build_output_stream(
&self,
device: &Self::Device,
format: &Format,
) -> Result<Self::StreamId, BuildStreamError>;
/// Instructs the audio device that it should start playing the stream with the given ID.
///
/// Has no effect is the stream was already playing.
///
/// Only call this after you have submitted some data, otherwise you may hear some glitches.
///
/// # Panic
///
/// If the stream does not exist, this function can either panic or be a no-op.
fn play_stream(&self, stream: Self::StreamId) -> Result<(), PlayStreamError>;
/// Instructs the audio device that it should stop playing the stream with the given ID.
///
/// Has no effect is the stream was already paused.
///
/// If you call `play` afterwards, the playback will resume where it was.
///
/// # Panic
///
/// If the stream does not exist, this function can either panic or be a no-op.
fn pause_stream(&self, stream: Self::StreamId) -> Result<(), PauseStreamError>;
/// Destroys an existing stream.
///
/// # Panic
///
/// If the stream does not exist, this function can either panic or be a no-op.
fn destroy_stream(&self, stream: Self::StreamId);
/// Takes control of the current thread and begins the stream processing.
///
/// > **Note**: Since it takes control of the thread, this method is best called on a separate
/// > thread.
///
/// Whenever a stream needs to be fed some data, the closure passed as parameter is called.
/// You can call the other methods of `EventLoop` without getting a deadlock.
fn run<F>(&self, callback: F) -> !
where
F: FnMut(Self::StreamId, StreamDataResult) + Send;
} }
/// The set of required bounds for host `StreamId` types.
pub trait StreamIdTrait: Clone + std::fmt::Debug + std::hash::Hash + PartialEq + Eq {}