From a49297ec2c0617a33ab5e496aeeb51317001392c Mon Sep 17 00:00:00 2001 From: mitchmindtree Date: Mon, 27 Apr 2020 18:00:40 +0200 Subject: [PATCH] WIP - Add ALSA timestamp implementation. I think this is mostly complete, however there are two major TODOs: - There seems to be a bug where timestamps are not reported when the target device is `pulse`. This causes a `panic!` as the trigger timestamp should always precede the timestamp when retrieved while the stream is running. - We need to specify the timestamp type as MONOTONIC_RAW, however the `alsa` and `alsa-sys` crates do not yet expose the necessary `set_tstamp_type` function. --- src/host/alsa/mod.rs | 110 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 87 insertions(+), 23 deletions(-) diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index c1bfc3f..7b229e5 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -8,6 +8,7 @@ use crate::{ PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; +use std::convert::TryInto; use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::vec::IntoIter as VecIntoIter; @@ -208,7 +209,7 @@ impl Device { channel: handle, sample_format, num_descriptors, - num_channels: conf.channels as u16, + conf: conf.clone(), buffer_len, period_len, can_pause, @@ -421,8 +422,8 @@ struct StreamInner { // Format of the samples. sample_format: SampleFormat, - // Number of channels, ie. number of samples per frame. - num_channels: u16, + // The configuration used to open this stream. + conf: StreamConfig, // Number of samples that can fit in the buffer. buffer_len: usize, @@ -479,7 +480,8 @@ fn input_stream_worker( PollDescriptorsFlow::Continue => continue, PollDescriptorsFlow::Return => return, PollDescriptorsFlow::Ready { - available_frames: _, + avail_frames: _, + delay_frames, stream_type, } => { assert_eq!( @@ -487,10 +489,8 @@ fn input_stream_worker( StreamType::Input, "expected input stream, but polling descriptors indicated output", ); - report_error( - process_input(stream, &mut ctxt.buffer, data_callback), - error_callback, - ); + let res = process_input(stream, &mut ctxt.buffer, delay_frames, data_callback); + report_error(res, error_callback); } } } @@ -514,7 +514,8 @@ fn output_stream_worker( PollDescriptorsFlow::Continue => continue, PollDescriptorsFlow::Return => return, PollDescriptorsFlow::Ready { - available_frames, + avail_frames, + delay_frames, stream_type, } => { assert_eq!( @@ -522,13 +523,15 @@ fn output_stream_worker( StreamType::Output, "expected output stream, but polling descriptors indicated input", ); - process_output( + let res = process_output( stream, &mut ctxt.buffer, - available_frames, + avail_frames, + delay_frames, data_callback, error_callback, ); + report_error(res, error_callback); } } } @@ -555,7 +558,8 @@ enum PollDescriptorsFlow { Return, Ready { stream_type: StreamType, - available_frames: usize, + avail_frames: usize, + delay_frames: usize, }, } @@ -614,7 +618,8 @@ fn poll_descriptors_and_prepare_buffer( } }; // Get the number of available samples for reading/writing. - let available_samples = get_available_samples(stream)?; + let (avail_frames, delay_frames) = get_avail_delay(stream)?; + let available_samples = avail_frames * stream.conf.channels as usize; // Only go on if there is at least `stream.period_len` samples. if available_samples < stream.period_len { @@ -624,11 +629,11 @@ fn poll_descriptors_and_prepare_buffer( // Prepare the data buffer. let buffer_size = stream.sample_format.sample_size() * available_samples; buffer.resize(buffer_size, 0u8); - let available_frames = available_samples / stream.num_channels as usize; Ok(PollDescriptorsFlow::Ready { stream_type, - available_frames, + avail_frames, + delay_frames, }) } @@ -636,6 +641,7 @@ fn poll_descriptors_and_prepare_buffer( fn process_input( stream: &StreamInner, buffer: &mut [u8], + delay_frames: usize, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), ) -> Result<(), BackendSpecificError> { stream.channel.io().readi(buffer)?; @@ -643,7 +649,13 @@ fn process_input( let data = buffer.as_mut_ptr() as *mut (); let len = buffer.len() / sample_format.sample_size(); let data = unsafe { Data::from_parts(data, len, sample_format) }; - let info = crate::InputCallbackInfo {}; + let callback = stream_timestamp(stream)?; + let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); + let capture = callback + .sub(delay_duration) + .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); Ok(()) @@ -656,16 +668,23 @@ fn process_output( stream: &StreamInner, buffer: &mut [u8], available_frames: usize, + delay_frames: usize, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut dyn FnMut(StreamError), -) { +) -> Result<(), BackendSpecificError> { { // We're now sure that we're ready to write data. let sample_format = stream.sample_format; let data = buffer.as_mut_ptr() as *mut (); let len = buffer.len() / sample_format.sample_size(); let mut data = unsafe { Data::from_parts(data, len, sample_format) }; - let info = crate::OutputCallbackInfo {}; + let callback = stream_timestamp(stream)?; + let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); + let playback = callback + .add(delay_duration) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); } loop { @@ -693,6 +712,47 @@ fn process_output( } } } + Ok(()) +} + +// Use the duration since the start of the stream. +// +// This ensures positive values that are compatible with our `StreamInstant` representation. +fn stream_timestamp(stream: &StreamInner) -> Result { + let status = stream.channel.status()?; + let trigger_ts = status.get_trigger_htstamp(); + // TODO: This is returning `0` on ALSA where default device forwards to pulse. + // Possibly related: https://bugs.freedesktop.org/show_bug.cgi?id=88503 + let ts = status.get_htstamp(); + let nanos = timespec_diff_nanos(ts, trigger_ts) + .try_into() + .unwrap_or_else(|_| { + panic!( + "get_htstamp `{:?}` was earlier than get_trigger_htstamp `{:?}`", + ts, trigger_ts + ); + }); + Ok(crate::StreamInstant::from_nanos(nanos)) +} + +// Adapted from `timestamp2ns` here: +// https://fossies.org/linux/alsa-lib/test/audio_time.c +fn timespec_to_nanos(ts: libc::timespec) -> i64 { + ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec +} + +// Adapted from `timediff` here: +// https://fossies.org/linux/alsa-lib/test/audio_time.c +fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 { + timespec_to_nanos(a) - timespec_to_nanos(b) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) } impl Stream { @@ -759,16 +819,16 @@ impl StreamTrait for Stream { } } -// Determine the number of samples that are available to read/write. -fn get_available_samples(stream: &StreamInner) -> Result { - match stream.channel.avail_update() { +// Determine the number of frames that are available to read/write along with the latency. +fn get_avail_delay(stream: &StreamInner) -> Result<(usize, usize), BackendSpecificError> { + match stream.channel.avail_delay() { Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => { // buffer underrun // TODO: Notify the user some how. - Ok(stream.buffer_len) + Ok((stream.buffer_len, 0)) } Err(err) => Err(err.into()), - Ok(available) => Ok(available as usize * stream.num_channels as usize), + Ok((avail, delay)) => Ok((avail as usize, delay as usize)), } } @@ -830,6 +890,10 @@ fn set_sw_params_from_format( (buffer, period) }; + sw_params.set_tstamp_mode(true)?; + // TODO: `sw_params.set_tstamp_type(CLOCK_MONOTONIC_RAW)` + // Pending addressing https://github.com/diwic/alsa-sys/issues/6 + pcm_handle.sw_params(&sw_params)?; Ok((buffer_len, period_len))