WIP - Add ALSA timestamp implementation.
I think this is mostly complete, however there are two major TODOs: - There seems to be a bug where timestamps are not reported when the target device is `pulse`. This causes a `panic!` as the trigger timestamp should always precede the timestamp when retrieved while the stream is running. - We need to specify the timestamp type as MONOTONIC_RAW, however the `alsa` and `alsa-sys` crates do not yet expose the necessary `set_tstamp_type` function.
This commit is contained in:
parent
777a6b2bd1
commit
a49297ec2c
|
@ -8,6 +8,7 @@ use crate::{
|
||||||
PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedStreamConfig,
|
PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedStreamConfig,
|
||||||
SupportedStreamConfigRange, SupportedStreamConfigsError,
|
SupportedStreamConfigRange, SupportedStreamConfigsError,
|
||||||
};
|
};
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread::{self, JoinHandle};
|
use std::thread::{self, JoinHandle};
|
||||||
use std::vec::IntoIter as VecIntoIter;
|
use std::vec::IntoIter as VecIntoIter;
|
||||||
|
@ -208,7 +209,7 @@ impl Device {
|
||||||
channel: handle,
|
channel: handle,
|
||||||
sample_format,
|
sample_format,
|
||||||
num_descriptors,
|
num_descriptors,
|
||||||
num_channels: conf.channels as u16,
|
conf: conf.clone(),
|
||||||
buffer_len,
|
buffer_len,
|
||||||
period_len,
|
period_len,
|
||||||
can_pause,
|
can_pause,
|
||||||
|
@ -421,8 +422,8 @@ struct StreamInner {
|
||||||
// Format of the samples.
|
// Format of the samples.
|
||||||
sample_format: SampleFormat,
|
sample_format: SampleFormat,
|
||||||
|
|
||||||
// Number of channels, ie. number of samples per frame.
|
// The configuration used to open this stream.
|
||||||
num_channels: u16,
|
conf: StreamConfig,
|
||||||
|
|
||||||
// Number of samples that can fit in the buffer.
|
// Number of samples that can fit in the buffer.
|
||||||
buffer_len: usize,
|
buffer_len: usize,
|
||||||
|
@ -479,7 +480,8 @@ fn input_stream_worker(
|
||||||
PollDescriptorsFlow::Continue => continue,
|
PollDescriptorsFlow::Continue => continue,
|
||||||
PollDescriptorsFlow::Return => return,
|
PollDescriptorsFlow::Return => return,
|
||||||
PollDescriptorsFlow::Ready {
|
PollDescriptorsFlow::Ready {
|
||||||
available_frames: _,
|
avail_frames: _,
|
||||||
|
delay_frames,
|
||||||
stream_type,
|
stream_type,
|
||||||
} => {
|
} => {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
@ -487,10 +489,8 @@ fn input_stream_worker(
|
||||||
StreamType::Input,
|
StreamType::Input,
|
||||||
"expected input stream, but polling descriptors indicated output",
|
"expected input stream, but polling descriptors indicated output",
|
||||||
);
|
);
|
||||||
report_error(
|
let res = process_input(stream, &mut ctxt.buffer, delay_frames, data_callback);
|
||||||
process_input(stream, &mut ctxt.buffer, data_callback),
|
report_error(res, error_callback);
|
||||||
error_callback,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -514,7 +514,8 @@ fn output_stream_worker(
|
||||||
PollDescriptorsFlow::Continue => continue,
|
PollDescriptorsFlow::Continue => continue,
|
||||||
PollDescriptorsFlow::Return => return,
|
PollDescriptorsFlow::Return => return,
|
||||||
PollDescriptorsFlow::Ready {
|
PollDescriptorsFlow::Ready {
|
||||||
available_frames,
|
avail_frames,
|
||||||
|
delay_frames,
|
||||||
stream_type,
|
stream_type,
|
||||||
} => {
|
} => {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
@ -522,13 +523,15 @@ fn output_stream_worker(
|
||||||
StreamType::Output,
|
StreamType::Output,
|
||||||
"expected output stream, but polling descriptors indicated input",
|
"expected output stream, but polling descriptors indicated input",
|
||||||
);
|
);
|
||||||
process_output(
|
let res = process_output(
|
||||||
stream,
|
stream,
|
||||||
&mut ctxt.buffer,
|
&mut ctxt.buffer,
|
||||||
available_frames,
|
avail_frames,
|
||||||
|
delay_frames,
|
||||||
data_callback,
|
data_callback,
|
||||||
error_callback,
|
error_callback,
|
||||||
);
|
);
|
||||||
|
report_error(res, error_callback);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -555,7 +558,8 @@ enum PollDescriptorsFlow {
|
||||||
Return,
|
Return,
|
||||||
Ready {
|
Ready {
|
||||||
stream_type: StreamType,
|
stream_type: StreamType,
|
||||||
available_frames: usize,
|
avail_frames: usize,
|
||||||
|
delay_frames: usize,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -614,7 +618,8 @@ fn poll_descriptors_and_prepare_buffer(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Get the number of available samples for reading/writing.
|
// Get the number of available samples for reading/writing.
|
||||||
let available_samples = get_available_samples(stream)?;
|
let (avail_frames, delay_frames) = get_avail_delay(stream)?;
|
||||||
|
let available_samples = avail_frames * stream.conf.channels as usize;
|
||||||
|
|
||||||
// Only go on if there is at least `stream.period_len` samples.
|
// Only go on if there is at least `stream.period_len` samples.
|
||||||
if available_samples < stream.period_len {
|
if available_samples < stream.period_len {
|
||||||
|
@ -624,11 +629,11 @@ fn poll_descriptors_and_prepare_buffer(
|
||||||
// Prepare the data buffer.
|
// Prepare the data buffer.
|
||||||
let buffer_size = stream.sample_format.sample_size() * available_samples;
|
let buffer_size = stream.sample_format.sample_size() * available_samples;
|
||||||
buffer.resize(buffer_size, 0u8);
|
buffer.resize(buffer_size, 0u8);
|
||||||
let available_frames = available_samples / stream.num_channels as usize;
|
|
||||||
|
|
||||||
Ok(PollDescriptorsFlow::Ready {
|
Ok(PollDescriptorsFlow::Ready {
|
||||||
stream_type,
|
stream_type,
|
||||||
available_frames,
|
avail_frames,
|
||||||
|
delay_frames,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -636,6 +641,7 @@ fn poll_descriptors_and_prepare_buffer(
|
||||||
fn process_input(
|
fn process_input(
|
||||||
stream: &StreamInner,
|
stream: &StreamInner,
|
||||||
buffer: &mut [u8],
|
buffer: &mut [u8],
|
||||||
|
delay_frames: usize,
|
||||||
data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
|
data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
|
||||||
) -> Result<(), BackendSpecificError> {
|
) -> Result<(), BackendSpecificError> {
|
||||||
stream.channel.io().readi(buffer)?;
|
stream.channel.io().readi(buffer)?;
|
||||||
|
@ -643,7 +649,13 @@ fn process_input(
|
||||||
let data = buffer.as_mut_ptr() as *mut ();
|
let data = buffer.as_mut_ptr() as *mut ();
|
||||||
let len = buffer.len() / sample_format.sample_size();
|
let len = buffer.len() / sample_format.sample_size();
|
||||||
let data = unsafe { Data::from_parts(data, len, sample_format) };
|
let data = unsafe { Data::from_parts(data, len, sample_format) };
|
||||||
let info = crate::InputCallbackInfo {};
|
let callback = stream_timestamp(stream)?;
|
||||||
|
let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
|
||||||
|
let capture = callback
|
||||||
|
.sub(delay_duration)
|
||||||
|
.expect("`capture` occurs before origin of alsa `StreamInstant`");
|
||||||
|
let timestamp = crate::InputStreamTimestamp { callback, capture };
|
||||||
|
let info = crate::InputCallbackInfo { timestamp };
|
||||||
data_callback(&data, &info);
|
data_callback(&data, &info);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -656,16 +668,23 @@ fn process_output(
|
||||||
stream: &StreamInner,
|
stream: &StreamInner,
|
||||||
buffer: &mut [u8],
|
buffer: &mut [u8],
|
||||||
available_frames: usize,
|
available_frames: usize,
|
||||||
|
delay_frames: usize,
|
||||||
data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
|
data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
|
||||||
error_callback: &mut dyn FnMut(StreamError),
|
error_callback: &mut dyn FnMut(StreamError),
|
||||||
) {
|
) -> Result<(), BackendSpecificError> {
|
||||||
{
|
{
|
||||||
// We're now sure that we're ready to write data.
|
// We're now sure that we're ready to write data.
|
||||||
let sample_format = stream.sample_format;
|
let sample_format = stream.sample_format;
|
||||||
let data = buffer.as_mut_ptr() as *mut ();
|
let data = buffer.as_mut_ptr() as *mut ();
|
||||||
let len = buffer.len() / sample_format.sample_size();
|
let len = buffer.len() / sample_format.sample_size();
|
||||||
let mut data = unsafe { Data::from_parts(data, len, sample_format) };
|
let mut data = unsafe { Data::from_parts(data, len, sample_format) };
|
||||||
let info = crate::OutputCallbackInfo {};
|
let callback = stream_timestamp(stream)?;
|
||||||
|
let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
|
||||||
|
let playback = callback
|
||||||
|
.add(delay_duration)
|
||||||
|
.expect("`playback` occurs beyond representation supported by `StreamInstant`");
|
||||||
|
let timestamp = crate::OutputStreamTimestamp { callback, playback };
|
||||||
|
let info = crate::OutputCallbackInfo { timestamp };
|
||||||
data_callback(&mut data, &info);
|
data_callback(&mut data, &info);
|
||||||
}
|
}
|
||||||
loop {
|
loop {
|
||||||
|
@ -693,6 +712,47 @@ fn process_output(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the duration since the start of the stream.
|
||||||
|
//
|
||||||
|
// This ensures positive values that are compatible with our `StreamInstant` representation.
|
||||||
|
fn stream_timestamp(stream: &StreamInner) -> Result<crate::StreamInstant, BackendSpecificError> {
|
||||||
|
let status = stream.channel.status()?;
|
||||||
|
let trigger_ts = status.get_trigger_htstamp();
|
||||||
|
// TODO: This is returning `0` on ALSA where default device forwards to pulse.
|
||||||
|
// Possibly related: https://bugs.freedesktop.org/show_bug.cgi?id=88503
|
||||||
|
let ts = status.get_htstamp();
|
||||||
|
let nanos = timespec_diff_nanos(ts, trigger_ts)
|
||||||
|
.try_into()
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
panic!(
|
||||||
|
"get_htstamp `{:?}` was earlier than get_trigger_htstamp `{:?}`",
|
||||||
|
ts, trigger_ts
|
||||||
|
);
|
||||||
|
});
|
||||||
|
Ok(crate::StreamInstant::from_nanos(nanos))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adapted from `timestamp2ns` here:
|
||||||
|
// https://fossies.org/linux/alsa-lib/test/audio_time.c
|
||||||
|
fn timespec_to_nanos(ts: libc::timespec) -> i64 {
|
||||||
|
ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adapted from `timediff` here:
|
||||||
|
// https://fossies.org/linux/alsa-lib/test/audio_time.c
|
||||||
|
fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
|
||||||
|
timespec_to_nanos(a) - timespec_to_nanos(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
|
||||||
|
fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
|
||||||
|
let secsf = frames as f64 / rate.0 as f64;
|
||||||
|
let secs = secsf as u64;
|
||||||
|
let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
|
||||||
|
std::time::Duration::new(secs, nanos)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream {
|
impl Stream {
|
||||||
|
@ -759,16 +819,16 @@ impl StreamTrait for Stream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine the number of samples that are available to read/write.
|
// Determine the number of frames that are available to read/write along with the latency.
|
||||||
fn get_available_samples(stream: &StreamInner) -> Result<usize, BackendSpecificError> {
|
fn get_avail_delay(stream: &StreamInner) -> Result<(usize, usize), BackendSpecificError> {
|
||||||
match stream.channel.avail_update() {
|
match stream.channel.avail_delay() {
|
||||||
Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => {
|
Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => {
|
||||||
// buffer underrun
|
// buffer underrun
|
||||||
// TODO: Notify the user some how.
|
// TODO: Notify the user some how.
|
||||||
Ok(stream.buffer_len)
|
Ok((stream.buffer_len, 0))
|
||||||
}
|
}
|
||||||
Err(err) => Err(err.into()),
|
Err(err) => Err(err.into()),
|
||||||
Ok(available) => Ok(available as usize * stream.num_channels as usize),
|
Ok((avail, delay)) => Ok((avail as usize, delay as usize)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -830,6 +890,10 @@ fn set_sw_params_from_format(
|
||||||
(buffer, period)
|
(buffer, period)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
sw_params.set_tstamp_mode(true)?;
|
||||||
|
// TODO: `sw_params.set_tstamp_type(CLOCK_MONOTONIC_RAW)`
|
||||||
|
// Pending addressing https://github.com/diwic/alsa-sys/issues/6
|
||||||
|
|
||||||
pcm_handle.sw_params(&sw_params)?;
|
pcm_handle.sw_params(&sw_params)?;
|
||||||
|
|
||||||
Ok((buffer_len, period_len))
|
Ok((buffer_len, period_len))
|
||||||
|
|
Loading…
Reference in New Issue