diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index c1bfc3f..db01eb2 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -8,6 +8,7 @@ use crate::{ PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; +use std::convert::TryInto; use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::vec::IntoIter as VecIntoIter; @@ -202,16 +203,25 @@ impl Device { num_descriptors }; + // Check to see if we can retrieve valid timestamps from the device. + // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503 + let ts = handle.status()?.get_htstamp(); + let creation_instant = match (ts.tv_sec, ts.tv_nsec) { + (0, 0) => Some(std::time::Instant::now()), + _ => None, + }; + handle.start()?; let stream_inner = StreamInner { channel: handle, sample_format, num_descriptors, - num_channels: conf.channels as u16, + conf: conf.clone(), buffer_len, period_len, can_pause, + creation_instant, }; Ok(stream_inner) @@ -421,8 +431,8 @@ struct StreamInner { // Format of the samples. sample_format: SampleFormat, - // Number of channels, ie. number of samples per frame. - num_channels: u16, + // The configuration used to open this stream. + conf: StreamConfig, // Number of samples that can fit in the buffer. buffer_len: usize, @@ -432,6 +442,16 @@ struct StreamInner { // Whether or not the hardware supports pausing the stream. can_pause: bool, + + // In the case that the device does not return valid timestamps via `get_htstamp`, this field + // will be `Some` and will contain an `Instant` representing the moment the stream was created. + // + // If this field is `Some`, then the stream will use the duration since this instant as a + // source for timestamps. + // + // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and + // `get_htstamp` is used. + creation_instant: Option, } // Assume that the ALSA library is built with thread safe option. @@ -479,7 +499,8 @@ fn input_stream_worker( PollDescriptorsFlow::Continue => continue, PollDescriptorsFlow::Return => return, PollDescriptorsFlow::Ready { - available_frames: _, + avail_frames: _, + delay_frames, stream_type, } => { assert_eq!( @@ -487,10 +508,8 @@ fn input_stream_worker( StreamType::Input, "expected input stream, but polling descriptors indicated output", ); - report_error( - process_input(stream, &mut ctxt.buffer, data_callback), - error_callback, - ); + let res = process_input(stream, &mut ctxt.buffer, delay_frames, data_callback); + report_error(res, error_callback); } } } @@ -514,7 +533,8 @@ fn output_stream_worker( PollDescriptorsFlow::Continue => continue, PollDescriptorsFlow::Return => return, PollDescriptorsFlow::Ready { - available_frames, + avail_frames, + delay_frames, stream_type, } => { assert_eq!( @@ -522,13 +542,15 @@ fn output_stream_worker( StreamType::Output, "expected output stream, but polling descriptors indicated input", ); - process_output( + let res = process_output( stream, &mut ctxt.buffer, - available_frames, + avail_frames, + delay_frames, data_callback, error_callback, ); + report_error(res, error_callback); } } } @@ -555,7 +577,8 @@ enum PollDescriptorsFlow { Return, Ready { stream_type: StreamType, - available_frames: usize, + avail_frames: usize, + delay_frames: usize, }, } @@ -614,7 +637,8 @@ fn poll_descriptors_and_prepare_buffer( } }; // Get the number of available samples for reading/writing. - let available_samples = get_available_samples(stream)?; + let (avail_frames, delay_frames) = get_avail_delay(stream)?; + let available_samples = avail_frames * stream.conf.channels as usize; // Only go on if there is at least `stream.period_len` samples. if available_samples < stream.period_len { @@ -624,11 +648,11 @@ fn poll_descriptors_and_prepare_buffer( // Prepare the data buffer. let buffer_size = stream.sample_format.sample_size() * available_samples; buffer.resize(buffer_size, 0u8); - let available_frames = available_samples / stream.num_channels as usize; Ok(PollDescriptorsFlow::Ready { stream_type, - available_frames, + avail_frames, + delay_frames, }) } @@ -636,6 +660,7 @@ fn poll_descriptors_and_prepare_buffer( fn process_input( stream: &StreamInner, buffer: &mut [u8], + delay_frames: usize, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), ) -> Result<(), BackendSpecificError> { stream.channel.io().readi(buffer)?; @@ -643,7 +668,13 @@ fn process_input( let data = buffer.as_mut_ptr() as *mut (); let len = buffer.len() / sample_format.sample_size(); let data = unsafe { Data::from_parts(data, len, sample_format) }; - let info = crate::InputCallbackInfo {}; + let callback = stream_timestamp(stream)?; + let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); + let capture = callback + .sub(delay_duration) + .expect("`capture` is earlier than representation supported by `StreamInstant`"); + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); Ok(()) @@ -656,16 +687,23 @@ fn process_output( stream: &StreamInner, buffer: &mut [u8], available_frames: usize, + delay_frames: usize, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut dyn FnMut(StreamError), -) { +) -> Result<(), BackendSpecificError> { { // We're now sure that we're ready to write data. let sample_format = stream.sample_format; let data = buffer.as_mut_ptr() as *mut (); let len = buffer.len() / sample_format.sample_size(); let mut data = unsafe { Data::from_parts(data, len, sample_format) }; - let info = crate::OutputCallbackInfo {}; + let callback = stream_timestamp(stream)?; + let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); + let playback = callback + .add(delay_duration) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); } loop { @@ -693,6 +731,56 @@ fn process_output( } } } + Ok(()) +} + +// Use the elapsed duration since the start of the stream. +// +// This ensures positive values that are compatible with our `StreamInstant` representation. +fn stream_timestamp(stream: &StreamInner) -> Result { + match stream.creation_instant { + None => { + let status = stream.channel.status()?; + let trigger_ts = status.get_trigger_htstamp(); + let ts = status.get_htstamp(); + let nanos = timespec_diff_nanos(ts, trigger_ts) + .try_into() + .unwrap_or_else(|_| { + panic!( + "get_htstamp `{:?}` was earlier than get_trigger_htstamp `{:?}`", + ts, trigger_ts + ); + }); + Ok(crate::StreamInstant::from_nanos(nanos)) + } + Some(creation) => { + let now = std::time::Instant::now(); + let duration = now.duration_since(creation); + let instant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128) + .expect("stream duration has exceeded `StreamInstant` representation"); + Ok(instant) + } + } +} + +// Adapted from `timestamp2ns` here: +// https://fossies.org/linux/alsa-lib/test/audio_time.c +fn timespec_to_nanos(ts: libc::timespec) -> i64 { + ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec +} + +// Adapted from `timediff` here: +// https://fossies.org/linux/alsa-lib/test/audio_time.c +fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 { + timespec_to_nanos(a) - timespec_to_nanos(b) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) } impl Stream { @@ -759,16 +847,16 @@ impl StreamTrait for Stream { } } -// Determine the number of samples that are available to read/write. -fn get_available_samples(stream: &StreamInner) -> Result { - match stream.channel.avail_update() { +// Determine the number of frames that are available to read/write along with the latency. +fn get_avail_delay(stream: &StreamInner) -> Result<(usize, usize), BackendSpecificError> { + match stream.channel.avail_delay() { Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => { // buffer underrun // TODO: Notify the user some how. - Ok(stream.buffer_len) + Ok((stream.buffer_len, 0)) } Err(err) => Err(err.into()), - Ok(available) => Ok(available as usize * stream.num_channels as usize), + Ok((avail, delay)) => Ok((avail as usize, delay as usize)), } } @@ -830,6 +918,11 @@ fn set_sw_params_from_format( (buffer, period) }; + sw_params.set_tstamp_mode(true)?; + // TODO: + // Pending new version of alsa-sys and alsa-rs getting published. + // sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?; + pcm_handle.sw_params(&sw_params)?; Ok((buffer_len, period_len))