diff --git a/Cargo.toml b/Cargo.toml index 85b7d26..65db108 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ hound = "3.4" ringbuf = "0.1.6" [target.'cfg(target_os = "windows")'.dependencies] -winapi = { version = "0.3", features = ["audiosessiontypes", "audioclient", "coml2api", "combaseapi", "debug", "devpkey", "handleapi", "ksmedia", "mmdeviceapi", "objbase", "std", "synchapi", "winbase", "winuser"] } +winapi = { version = "0.3", features = ["audiosessiontypes", "audioclient", "coml2api", "combaseapi", "debug", "devpkey", "handleapi", "ksmedia", "mmdeviceapi", "objbase", "profileapi", "std", "synchapi", "winbase", "winuser"] } asio-sys = { version = "0.1", path = "asio-sys", optional = true } parking_lot = "0.9" @@ -34,6 +34,7 @@ libc = "0.2.65" [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] coreaudio-rs = { version = "0.9.1", default-features = false, features = ["audio_unit", "core_audio"] } core-foundation-sys = "0.6.2" # For linking to CoreFoundation.framework and handling device name `CFString`s. +mach = "0.3" # For access to mach_timebase type. [target.'cfg(target_os = "emscripten")'.dependencies] stdweb = { version = "0.1.3", default-features = false } diff --git a/asio-sys/src/bindings/mod.rs b/asio-sys/src/bindings/mod.rs index 0964685..f0ba084 100644 --- a/asio-sys/src/bindings/mod.rs +++ b/asio-sys/src/bindings/mod.rs @@ -84,8 +84,15 @@ pub struct SampleRate { pub rate: u32, } +/// Information provided to the BufferCallback. +#[derive(Debug)] +pub struct CallbackInfo { + pub buffer_index: i32, + pub system_time: ai::ASIOTimeStamp, +} + /// Holds the pointer to the callbacks that come from cpal -struct BufferCallback(Box); +struct BufferCallback(Box); /// Input and Output streams. /// @@ -355,9 +362,9 @@ impl Asio { impl BufferCallback { /// Calls the inner callback. - fn run(&mut self, index: i32) { + fn run(&mut self, callback_info: &CallbackInfo) { let cb = &mut self.0; - cb(index); + cb(callback_info); } } @@ -610,7 +617,7 @@ impl Driver { /// Returns an ID uniquely associated with the given callback so that it may be removed later. pub fn add_callback(&self, callback: F) -> CallbackId where - F: 'static + FnMut(i32) + Send, + F: 'static + FnMut(&CallbackInfo) + Send, { let mut bc = BUFFER_CALLBACK.lock().unwrap(); let id = bc @@ -889,7 +896,7 @@ extern "C" fn asio_message( // Informs the driver whether the application is interested in time code info. If an // application does not need to know about time code, the driver has less work to do. // TODO: Provide an option for this? - 0 + 1 } _ => 0, // Unknown/unhandled message type. @@ -909,8 +916,13 @@ extern "C" fn buffer_switch_time_info( ) -> *mut ai::ASIOTime { // This lock is probably unavoidable, but locks in the audio stream are not great. let mut bcs = BUFFER_CALLBACK.lock().unwrap(); + let asio_time: &mut AsioTime = unsafe { &mut *(time as *mut AsioTime) }; + let callback_info = CallbackInfo { + buffer_index: double_buffer_index, + system_time: asio_time.time_info.system_time, + }; for &mut (_, ref mut bc) in bcs.iter_mut() { - bc.run(double_buffer_index); + bc.run(&callback_info); } time } @@ -952,6 +964,10 @@ fn check_type_sizes() { std::mem::size_of::(), std::mem::size_of::() ); + assert_eq!( + std::mem::size_of::(), + std::mem::size_of::(), + ); assert_eq!( std::mem::size_of::(), std::mem::size_of::() diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index c1bfc3f..6e59f91 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -8,6 +8,7 @@ use crate::{ PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; +use std::convert::TryInto; use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::vec::IntoIter as VecIntoIter; @@ -188,7 +189,7 @@ impl Device { let hw_params = set_hw_params_from_format(&handle, conf, sample_format)?; hw_params.can_pause() }; - let (buffer_len, period_len) = set_sw_params_from_format(&handle, conf)?; + let (_buffer_len, period_len) = set_sw_params_from_format(&handle, conf)?; handle.prepare()?; @@ -202,16 +203,24 @@ impl Device { num_descriptors }; + // Check to see if we can retrieve valid timestamps from the device. + // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503 + let ts = handle.status()?.get_htstamp(); + let creation_instant = match (ts.tv_sec, ts.tv_nsec) { + (0, 0) => Some(std::time::Instant::now()), + _ => None, + }; + handle.start()?; let stream_inner = StreamInner { channel: handle, sample_format, num_descriptors, - num_channels: conf.channels as u16, - buffer_len, + conf: conf.clone(), period_len, can_pause, + creation_instant, }; Ok(stream_inner) @@ -421,17 +430,24 @@ struct StreamInner { // Format of the samples. sample_format: SampleFormat, - // Number of channels, ie. number of samples per frame. - num_channels: u16, - - // Number of samples that can fit in the buffer. - buffer_len: usize, + // The configuration used to open this stream. + conf: StreamConfig, // Minimum number of samples to put in the buffer. period_len: usize, // Whether or not the hardware supports pausing the stream. can_pause: bool, + + // In the case that the device does not return valid timestamps via `get_htstamp`, this field + // will be `Some` and will contain an `Instant` representing the moment the stream was created. + // + // If this field is `Some`, then the stream will use the duration since this instant as a + // source for timestamps. + // + // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and + // `get_htstamp` is used. + creation_instant: Option, } // Assume that the ALSA library is built with thread safe option. @@ -479,7 +495,9 @@ fn input_stream_worker( PollDescriptorsFlow::Continue => continue, PollDescriptorsFlow::Return => return, PollDescriptorsFlow::Ready { - available_frames: _, + status, + avail_frames: _, + delay_frames, stream_type, } => { assert_eq!( @@ -487,10 +505,14 @@ fn input_stream_worker( StreamType::Input, "expected input stream, but polling descriptors indicated output", ); - report_error( - process_input(stream, &mut ctxt.buffer, data_callback), - error_callback, + let res = process_input( + stream, + &mut ctxt.buffer, + status, + delay_frames, + data_callback, ); + report_error(res, error_callback); } } } @@ -514,7 +536,9 @@ fn output_stream_worker( PollDescriptorsFlow::Continue => continue, PollDescriptorsFlow::Return => return, PollDescriptorsFlow::Ready { - available_frames, + status, + avail_frames, + delay_frames, stream_type, } => { assert_eq!( @@ -522,13 +546,16 @@ fn output_stream_worker( StreamType::Output, "expected output stream, but polling descriptors indicated input", ); - process_output( + let res = process_output( stream, &mut ctxt.buffer, - available_frames, + status, + avail_frames, + delay_frames, data_callback, error_callback, ); + report_error(res, error_callback); } } } @@ -555,7 +582,9 @@ enum PollDescriptorsFlow { Return, Ready { stream_type: StreamType, - available_frames: usize, + status: alsa::pcm::Status, + avail_frames: usize, + delay_frames: usize, }, } @@ -613,8 +642,15 @@ fn poll_descriptors_and_prepare_buffer( return Ok(PollDescriptorsFlow::Continue); } }; - // Get the number of available samples for reading/writing. - let available_samples = get_available_samples(stream)?; + + let status = stream.channel.status()?; + let avail_frames = status.get_avail() as usize; + let delay_frames = match status.get_delay() { + // Buffer underrun. TODO: Notify the user. + d if d < 0 => 0, + d => d as usize, + }; + let available_samples = avail_frames * stream.conf.channels as usize; // Only go on if there is at least `stream.period_len` samples. if available_samples < stream.period_len { @@ -624,11 +660,12 @@ fn poll_descriptors_and_prepare_buffer( // Prepare the data buffer. let buffer_size = stream.sample_format.sample_size() * available_samples; buffer.resize(buffer_size, 0u8); - let available_frames = available_samples / stream.num_channels as usize; Ok(PollDescriptorsFlow::Ready { stream_type, - available_frames, + status, + avail_frames, + delay_frames, }) } @@ -636,6 +673,8 @@ fn poll_descriptors_and_prepare_buffer( fn process_input( stream: &StreamInner, buffer: &mut [u8], + status: alsa::pcm::Status, + delay_frames: usize, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), ) -> Result<(), BackendSpecificError> { stream.channel.io().readi(buffer)?; @@ -643,7 +682,13 @@ fn process_input( let data = buffer.as_mut_ptr() as *mut (); let len = buffer.len() / sample_format.sample_size(); let data = unsafe { Data::from_parts(data, len, sample_format) }; - let info = crate::InputCallbackInfo {}; + let callback = stream_timestamp(&status, stream.creation_instant)?; + let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); + let capture = callback + .sub(delay_duration) + .expect("`capture` is earlier than representation supported by `StreamInstant`"); + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; data_callback(&data, &info); Ok(()) @@ -655,17 +700,25 @@ fn process_input( fn process_output( stream: &StreamInner, buffer: &mut [u8], + status: alsa::pcm::Status, available_frames: usize, + delay_frames: usize, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut dyn FnMut(StreamError), -) { +) -> Result<(), BackendSpecificError> { { // We're now sure that we're ready to write data. let sample_format = stream.sample_format; let data = buffer.as_mut_ptr() as *mut (); let len = buffer.len() / sample_format.sample_size(); let mut data = unsafe { Data::from_parts(data, len, sample_format) }; - let info = crate::OutputCallbackInfo {}; + let callback = stream_timestamp(&status, stream.creation_instant)?; + let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); + let playback = callback + .add(delay_duration) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); } loop { @@ -693,6 +746,58 @@ fn process_output( } } } + Ok(()) +} + +// Use the elapsed duration since the start of the stream. +// +// This ensures positive values that are compatible with our `StreamInstant` representation. +fn stream_timestamp( + status: &alsa::pcm::Status, + creation_instant: Option, +) -> Result { + match creation_instant { + None => { + let trigger_ts = status.get_trigger_htstamp(); + let ts = status.get_htstamp(); + let nanos = timespec_diff_nanos(ts, trigger_ts) + .try_into() + .unwrap_or_else(|_| { + panic!( + "get_htstamp `{:?}` was earlier than get_trigger_htstamp `{:?}`", + ts, trigger_ts + ); + }); + Ok(crate::StreamInstant::from_nanos(nanos)) + } + Some(creation) => { + let now = std::time::Instant::now(); + let duration = now.duration_since(creation); + let instant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128) + .expect("stream duration has exceeded `StreamInstant` representation"); + Ok(instant) + } + } +} + +// Adapted from `timestamp2ns` here: +// https://fossies.org/linux/alsa-lib/test/audio_time.c +fn timespec_to_nanos(ts: libc::timespec) -> i64 { + ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec +} + +// Adapted from `timediff` here: +// https://fossies.org/linux/alsa-lib/test/audio_time.c +fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 { + timespec_to_nanos(a) - timespec_to_nanos(b) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) } impl Stream { @@ -759,19 +864,6 @@ impl StreamTrait for Stream { } } -// Determine the number of samples that are available to read/write. -fn get_available_samples(stream: &StreamInner) -> Result { - match stream.channel.avail_update() { - Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => { - // buffer underrun - // TODO: Notify the user some how. - Ok(stream.buffer_len) - } - Err(err) => Err(err.into()), - Ok(available) => Ok(available as usize * stream.num_channels as usize), - } -} - fn set_hw_params_from_format<'a>( pcm_handle: &'a alsa::pcm::PCM, config: &StreamConfig, @@ -830,6 +922,11 @@ fn set_sw_params_from_format( (buffer, period) }; + sw_params.set_tstamp_mode(true)?; + // TODO: + // Pending new version of alsa-sys and alsa-rs getting published. + // sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?; + pcm_handle.sw_params(&sw_params)?; Ok((buffer_len, period_len)) diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index 766e9d2..e8fbc2d 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -87,7 +87,8 @@ impl Device { // Set the input callback. // This is most performance critical part of the ASIO bindings. - let callback_id = self.driver.add_callback(move |buffer_index| unsafe { + let config = config.clone(); + let callback_id = self.driver.add_callback(move |callback_info| unsafe { // If not playing return early. if !playing.load(Ordering::SeqCst) { return; @@ -103,10 +104,11 @@ impl Device { /// 1. Write from the ASIO buffer to the interleaved CPAL buffer. /// 2. Deliver the CPAL buffer to the user callback. unsafe fn process_input_callback( - callback: &mut D, + data_callback: &mut D, interleaved: &mut [u8], asio_stream: &sys::AsioStream, - buffer_index: usize, + asio_info: &sys::CallbackInfo, + sample_rate: crate::SampleRate, from_endianness: F, ) where A: AsioSample, @@ -116,7 +118,9 @@ impl Device { { // 1. Write the ASIO channels to the CPAL buffer. let interleaved: &mut [B] = cast_slice_mut(interleaved); - let n_channels = interleaved.len() / asio_stream.buffer_size as usize; + let n_frames = asio_stream.buffer_size as usize; + let n_channels = interleaved.len() / n_frames; + let buffer_index = asio_info.buffer_index as usize; for ch_ix in 0..n_channels { let asio_channel = asio_channel_slice::(asio_stream, buffer_index, ch_ix); for (frame, s_asio) in interleaved.chunks_mut(n_channels).zip(asio_channel) { @@ -128,8 +132,14 @@ impl Device { let data = interleaved.as_mut_ptr() as *mut (); let len = interleaved.len(); let data = Data::from_parts(data, len, B::FORMAT); - let info = InputCallbackInfo {}; - callback(&data, &info); + let callback = system_time_to_stream_instant(asio_info.system_time); + let delay = frames_to_duration(n_frames, sample_rate); + let capture = callback + .sub(delay) + .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = InputCallbackInfo { timestamp }; + data_callback(&data, &info); } match (&stream_type, sample_format) { @@ -138,7 +148,8 @@ impl Device { &mut data_callback, &mut interleaved, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, from_le, ); } @@ -147,7 +158,8 @@ impl Device { &mut data_callback, &mut interleaved, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, from_be, ); } @@ -160,7 +172,8 @@ impl Device { &mut data_callback, &mut interleaved, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, std::convert::identity::, ); } @@ -173,7 +186,8 @@ impl Device { &mut data_callback, &mut interleaved, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, from_le, ); } @@ -182,7 +196,8 @@ impl Device { &mut data_callback, &mut interleaved, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, from_be, ); } @@ -194,7 +209,8 @@ impl Device { &mut data_callback, &mut interleaved, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, std::convert::identity::, ); } @@ -254,7 +270,8 @@ impl Device { let playing = Arc::clone(&stream_playing); let asio_streams = self.asio_streams.clone(); - let callback_id = self.driver.add_callback(move |buffer_index| unsafe { + let config = config.clone(); + let callback_id = self.driver.add_callback(move |callback_info| unsafe { // If not playing, return early. if !playing.load(Ordering::SeqCst) { return; @@ -273,7 +290,7 @@ impl Device { // the current `buffer_index`. // // If not, we will silence it and set the opposite buffer half to unsilenced. - let silence = match buffer_index { + let silence = match callback_info.buffer_index { 0 if !silence_asio_buffer.first => { silence_asio_buffer.first = true; silence_asio_buffer.second = false; @@ -294,11 +311,12 @@ impl Device { /// 3. Finally, write the interleaved data to the non-interleaved ASIO buffer, /// performing endianness conversions as necessary. unsafe fn process_output_callback( - callback: &mut D, + data_callback: &mut D, interleaved: &mut [u8], silence_asio_buffer: bool, asio_stream: &sys::AsioStream, - buffer_index: usize, + asio_info: &sys::CallbackInfo, + sample_rate: crate::SampleRate, to_endianness: F, ) where A: Sample, @@ -311,11 +329,19 @@ impl Device { let data = interleaved.as_mut_ptr() as *mut (); let len = interleaved.len(); let mut data = Data::from_parts(data, len, A::FORMAT); - let info = OutputCallbackInfo {}; - callback(&mut data, &info); + let callback = system_time_to_stream_instant(asio_info.system_time); + let n_frames = asio_stream.buffer_size as usize; + let delay = frames_to_duration(n_frames, sample_rate); + let playback = callback + .add(delay) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = OutputCallbackInfo { timestamp }; + data_callback(&mut data, &info); // 2. Silence ASIO channels if necessary. - let n_channels = interleaved.len() / asio_stream.buffer_size as usize; + let n_channels = interleaved.len() / n_frames; + let buffer_index = asio_info.buffer_index as usize; if silence_asio_buffer { for ch_ix in 0..n_channels { let asio_channel = @@ -343,7 +369,8 @@ impl Device { &mut interleaved, silence, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, to_le, ); } @@ -353,7 +380,8 @@ impl Device { &mut interleaved, silence, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, to_be, ); } @@ -367,7 +395,8 @@ impl Device { &mut interleaved, silence, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, std::convert::identity::, ); } @@ -381,7 +410,8 @@ impl Device { &mut interleaved, silence, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, to_le, ); } @@ -391,7 +421,8 @@ impl Device { &mut interleaved, silence, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, to_be, ); } @@ -404,7 +435,8 @@ impl Device { &mut interleaved, silence, asio_stream, - buffer_index as usize, + callback_info, + config.sample_rate, std::convert::identity::, ); } @@ -578,6 +610,29 @@ impl AsioSample for f64 { } } +fn asio_ns_to_double(val: sys::bindings::asio_import::ASIOTimeStamp) -> f64 { + let two_raised_to_32 = 4294967296.0; + val.lo as f64 + val.hi as f64 * two_raised_to_32 +} + +/// Asio retrieves system time via `timeGetTime` which returns the time in milliseconds. +fn system_time_to_stream_instant( + system_time: sys::bindings::asio_import::ASIOTimeStamp, +) -> crate::StreamInstant { + let systime_ns = asio_ns_to_double(system_time); + let secs = systime_ns as i64 / 1_000_000_000; + let nanos = (systime_ns as i64 - secs * 1_000_000_000) as u32; + crate::StreamInstant::new(secs, nanos) +} + +/// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + /// Check whether or not the desired config is supported by the stream. /// /// Checks sample rate, data type and then finally the number of channels. diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index 4559690..7100058 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -504,7 +504,7 @@ impl Device { config: &StreamConfig, sample_format: SampleFormat, mut data_callback: D, - _error_callback: E, + mut error_callback: E, ) -> Result where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, @@ -658,6 +658,7 @@ impl Device { // Register the callback that is being called by coreaudio whenever it needs data to be // fed to the audio buffer. let bytes_per_channel = sample_format.sample_size(); + let sample_rate = config.sample_rate; type Args = render_callback::Args; audio_unit.set_input_callback(move |args: Args| unsafe { let ptr = (*args.data.data).mBuffers.as_ptr() as *const AudioBuffer; @@ -666,7 +667,7 @@ impl Device { // TODO: Perhaps loop over all buffers instead? let AudioBuffer { - mNumberChannels: _num_channels, + mNumberChannels: channels, mDataByteSize: data_byte_size, mData: data, } = buffers[0]; @@ -674,7 +675,23 @@ impl Device { let data = data as *mut (); let len = (data_byte_size as usize / bytes_per_channel) as usize; let data = Data::from_parts(data, len, sample_format); - let info = InputCallbackInfo {}; + + // TODO: Need a better way to get delay, for now we assume a double-buffer offset. + let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { + Err(err) => { + error_callback(err.into()); + return Err(()); + } + Ok(cb) => cb, + }; + let buffer_frames = len / channels as usize; + let delay = frames_to_duration(buffer_frames, sample_rate); + let capture = callback + .sub(delay) + .expect("`capture` occurs before origin of alsa `StreamInstant`"); + let timestamp = crate::InputStreamTimestamp { callback, capture }; + + let info = InputCallbackInfo { timestamp }; data_callback(&data, &info); Ok(()) })?; @@ -693,7 +710,7 @@ impl Device { config: &StreamConfig, sample_format: SampleFormat, mut data_callback: D, - _error_callback: E, + mut error_callback: E, ) -> Result where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, @@ -712,13 +729,14 @@ impl Device { // Register the callback that is being called by coreaudio whenever it needs data to be // fed to the audio buffer. let bytes_per_channel = sample_format.sample_size(); + let sample_rate = config.sample_rate; type Args = render_callback::Args; audio_unit.set_render_callback(move |args: Args| unsafe { // If `run()` is currently running, then a callback will be available from this list. // Otherwise, we just fill the buffer with zeroes and return. let AudioBuffer { - mNumberChannels: _num_channels, + mNumberChannels: channels, mDataByteSize: data_byte_size, mData: data, } = (*args.data.data).mBuffers[0]; @@ -726,7 +744,23 @@ impl Device { let data = data as *mut (); let len = (data_byte_size as usize / bytes_per_channel) as usize; let mut data = Data::from_parts(data, len, sample_format); - let info = OutputCallbackInfo {}; + + let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { + Err(err) => { + error_callback(err.into()); + return Err(()); + } + Ok(cb) => cb, + }; + // TODO: Need a better way to get delay, for now we assume a double-buffer offset. + let buffer_frames = len / channels as usize; + let delay = frames_to_duration(buffer_frames, sample_rate); + let playback = callback + .add(delay) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + + let info = OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); Ok(()) })?; @@ -741,6 +775,26 @@ impl Device { } } +fn host_time_to_stream_instant( + m_host_time: u64, +) -> Result { + let mut info: mach::mach_time::mach_timebase_info = Default::default(); + let res = unsafe { mach::mach_time::mach_timebase_info(&mut info) }; + check_os_status(res)?; + let nanos = m_host_time * info.numer as u64 / info.denom as u64; + let secs = nanos / 1_000_000_000; + let subsec_nanos = nanos - secs * 1_000_000_000; + Ok(crate::StreamInstant::new(secs as i64, subsec_nanos as u32)) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + pub struct Stream { inner: RefCell, } diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index fba0a04..d3a31bd 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -228,6 +228,8 @@ where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { + const SAMPLE_RATE: usize = 44100; + unsafe { let user_data_ptr2 = user_data_ptr as *mut (&Stream, D, E); let user_data = &mut *user_data_ptr2; @@ -235,14 +237,32 @@ where let audio_ctxt = &stream.audio_ctxt_ref; // TODO: We should be re-using a buffer. - let mut temporary_buffer = vec![0.0; 44100 * 2 / 3]; + let mut temporary_buffer = vec![0.0; SAMPLE_RATE * 2 / 3]; { let len = temporary_buffer.len(); let data = temporary_buffer.as_mut_ptr() as *mut (); let sample_format = SampleFormat::F32; let mut data = Data::from_parts(data, len, sample_format); - let info = OutputCallbackInfo {}; + + let now_secs: f64 = js!(@{audio_ctxt}.getOutputTimestamp().currentTime) + .try_into() + .expect("failed to retrieve Value as f64"); + let callback = { + let secs = now_secs as i64; + let nanos = ((now_secs * 1_000_000_000.0) - secs as f64 * 1_000_000_000.0) as u32; + crate::StreamInstant::new(secs, nanos) + }; + // TODO: Use proper latency instead. Currently unsupported on most browsers though so + // we estimate based on buffer size instead. Probably should use this, but it's only + // supported by firefox (2020-04-28). + // let latency_secs: f64 = js!(@{audio_ctxt}.outputLatency).try_into().unwrap(); + let buffer_duration = frames_to_duration(len, SAMPLE_RATE); + let playback = callback + .add(buffer_duration) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = OutputCallbackInfo { timestamp }; data_cb(&mut data, &info); } @@ -331,3 +351,11 @@ fn is_webaudio_available() -> bool { .try_into() .unwrap() } + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: usize, rate: usize) -> std::time::Duration { + let secsf = frames as f64 / rate as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} diff --git a/src/host/wasapi/device.rs b/src/host/wasapi/device.rs index 1106bb7..83a9c77 100644 --- a/src/host/wasapi/device.rs +++ b/src/host/wasapi/device.rs @@ -42,8 +42,7 @@ use super::winapi::um::mmdeviceapi::{ eAll, eCapture, eConsole, eRender, CLSID_MMDeviceEnumerator, EDataFlow, IMMDevice, IMMDeviceCollection, IMMDeviceEnumerator, IMMEndpoint, DEVICE_STATE_ACTIVE, }; -use super::winapi::um::winnt::LPWSTR; -use super::winapi::um::winnt::WCHAR; +use super::winapi::um::winnt::{LPWSTR, WCHAR}; use super::{ stream::{AudioClientFlow, Stream, StreamInner}, @@ -750,13 +749,20 @@ impl Device { // `run()` method and added to the `RunContext`. let client_flow = AudioClientFlow::Capture { capture_client }; + let audio_clock = get_audio_clock(audio_client).map_err(|err| { + (*audio_client).Release(); + err + })?; + Ok(StreamInner { audio_client, + audio_clock, client_flow, event, playing: false, max_frames_in_buffer, bytes_per_frame: waveformatex.nBlockAlign, + config: config.clone(), sample_format, }) } @@ -895,13 +901,20 @@ impl Device { // `run()` method and added to the `RunContext`. let client_flow = AudioClientFlow::Render { render_client }; + let audio_clock = get_audio_clock(audio_client).map_err(|err| { + (*audio_client).Release(); + err + })?; + Ok(StreamInner { audio_client, + audio_clock, client_flow, event, playing: false, max_frames_in_buffer, bytes_per_frame: waveformatex.nBlockAlign, + config: config.clone(), sample_format, }) } @@ -1147,6 +1160,29 @@ pub fn default_output_device() -> Option { default_device(eRender) } +/// Get the audio clock used to produce `StreamInstant`s. +unsafe fn get_audio_clock( + audio_client: *mut audioclient::IAudioClient, +) -> Result<*mut audioclient::IAudioClock, BuildStreamError> { + let mut audio_clock: *mut audioclient::IAudioClock = ptr::null_mut(); + let hresult = (*audio_client).GetService( + &audioclient::IID_IAudioClock, + &mut audio_clock as *mut *mut audioclient::IAudioClock as *mut _, + ); + match check_result(hresult) { + Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => { + return Err(BuildStreamError::DeviceNotAvailable); + } + Err(e) => { + let description = format!("failed to build audio clock: {}", e); + let err = BackendSpecificError { description }; + return Err(err.into()); + } + Ok(()) => (), + }; + Ok(audio_clock) +} + // Turns a `Format` into a `WAVEFORMATEXTENSIBLE`. // // Returns `None` if the WAVEFORMATEXTENSIBLE does not support the given format. diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 5719cbc..8e2429e 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -1,5 +1,5 @@ use super::check_result; -use super::winapi::shared::basetsd::UINT32; +use super::winapi::shared::basetsd::{UINT32, UINT64}; use super::winapi::shared::minwindef::{BYTE, FALSE, WORD}; use super::winapi::um::audioclient::{self, AUDCLNT_E_DEVICE_INVALIDATED, AUDCLNT_S_BUFFER_EMPTY}; use super::winapi::um::handleapi; @@ -64,6 +64,7 @@ pub enum AudioClientFlow { pub struct StreamInner { pub audio_client: *mut audioclient::IAudioClient, + pub audio_clock: *mut audioclient::IAudioClock, pub client_flow: AudioClientFlow, // Event that is signalled by WASAPI whenever audio data must be written. pub event: winnt::HANDLE, @@ -73,6 +74,8 @@ pub struct StreamInner { pub max_frames_in_buffer: UINT32, // Number of bytes that each frame occupies. pub bytes_per_frame: WORD, + // The configuration with which the stream was created. + pub config: crate::StreamConfig, // The sample format with which the stream was created. pub sample_format: SampleFormat, } @@ -185,6 +188,7 @@ impl Drop for StreamInner { fn drop(&mut self) { unsafe { (*self.audio_client).Release(); + (*self.audio_clock).Release(); handleapi::CloseHandle(self.event); } } @@ -388,12 +392,13 @@ fn process_input( if frames_available == 0 { return ControlFlow::Continue; } + let mut qpc_position: UINT64 = 0; let hresult = (*capture_client).GetBuffer( &mut buffer, &mut frames_available, flags.as_mut_ptr(), ptr::null_mut(), - ptr::null_mut(), + &mut qpc_position, ); // TODO: Can this happen? @@ -410,7 +415,16 @@ fn process_input( let len = frames_available as usize * stream.bytes_per_frame as usize / stream.sample_format.sample_size(); let data = Data::from_parts(data, len, stream.sample_format); - let info = InputCallbackInfo {}; + + // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. + let timestamp = match input_timestamp(stream, qpc_position) { + Ok(ts) => ts, + Err(err) => { + error_callback(err); + return ControlFlow::Break; + } + }; + let info = InputCallbackInfo { timestamp }; data_callback(&data, &info); // Release the buffer. @@ -455,7 +469,15 @@ fn process_output( let len = frames_available as usize * stream.bytes_per_frame as usize / stream.sample_format.sample_size(); let mut data = Data::from_parts(data, len, stream.sample_format); - let info = OutputCallbackInfo {}; + let sample_rate = stream.config.sample_rate; + let timestamp = match output_timestamp(stream, frames_available, sample_rate) { + Ok(ts) => ts, + Err(err) => { + error_callback(err); + return ControlFlow::Break; + } + }; + let info = OutputCallbackInfo { timestamp }; data_callback(&mut data, &info); let hresult = (*render_client).ReleaseBuffer(frames_available as u32, 0); @@ -467,3 +489,66 @@ fn process_output( ControlFlow::Continue } + +/// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +fn frames_to_duration(frames: u32, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate.0 as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + +/// Use the stream's `IAudioClock` to produce the current stream instant. +/// +/// Uses the QPC position produced via the `GetPosition` method. +fn stream_instant(stream: &StreamInner) -> Result { + let mut position: UINT64 = 0; + let mut qpc_position: UINT64 = 0; + let res = unsafe { (*stream.audio_clock).GetPosition(&mut position, &mut qpc_position) }; + stream_error_from_hresult(res)?; + // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. + let qpc_nanos = qpc_position as i128 * 100; + let instant = crate::StreamInstant::from_nanos_i128(qpc_nanos) + .expect("performance counter out of range of `StreamInstant` representation"); + Ok(instant) +} + +/// Produce the input stream timestamp. +/// +/// `buffer_qpc_position` is the `qpc_position` returned via the `GetBuffer` call on the capture +/// client. It represents the instant at which the first sample of the retrieved buffer was +/// captured. +fn input_timestamp( + stream: &StreamInner, + buffer_qpc_position: UINT64, +) -> Result { + // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds. + let qpc_nanos = buffer_qpc_position as i128 * 100; + let capture = crate::StreamInstant::from_nanos_i128(qpc_nanos) + .expect("performance counter out of range of `StreamInstant` representation"); + let callback = stream_instant(stream)?; + Ok(crate::InputStreamTimestamp { capture, callback }) +} + +/// Produce the output stream timestamp. +/// +/// `frames_available` is the number of frames available for writing as reported by subtracting the +/// result of `GetCurrentPadding` from the maximum buffer size. +/// +/// `sample_rate` is the rate at which audio frames are processed by the device. +/// +/// TODO: The returned `playback` is an estimate that assumes audio is delivered immediately after +/// `frames_available` are consumed. The reality is that there is likely a tiny amount of latency +/// after this, but not sure how to determine this. +fn output_timestamp( + stream: &StreamInner, + frames_available: u32, + sample_rate: crate::SampleRate, +) -> Result { + let callback = stream_instant(stream)?; + let buffer_duration = frames_to_duration(frames_available, sample_rate); + let playback = callback + .add(buffer_duration) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + Ok(crate::OutputStreamTimestamp { callback, playback }) +} diff --git a/src/lib.rs b/src/lib.rs index 31a6b45..79cefe7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,6 +158,8 @@ pub use platform::{ SupportedInputConfigs, SupportedOutputConfigs, ALL_HOSTS, }; pub use samples_formats::{Sample, SampleFormat}; +use std::convert::TryInto; +use std::time::Duration; mod error; mod host; @@ -220,13 +222,63 @@ pub struct Data { sample_format: SampleFormat, } -/// Information relevant to a single call to the user's output stream data callback. -#[derive(Debug, Clone, PartialEq)] -pub struct OutputCallbackInfo {} +/// A monotonic time instance associated with a stream, retrieved from either: +/// +/// 1. A timestamp provided to the stream's underlying audio data callback or +/// 2. The same time source used to generate timestamps for a stream's underlying audio data +/// callback. +/// +/// **StreamInstant** represents a duration since some unspecified origin occurring either before +/// or equal to the moment the stream from which it was created begins. +/// +/// ## Host `StreamInstant` Sources +/// +/// | Host | Source | +/// | ---- | ------ | +/// | alsa | `snd_pcm_status_get_htstamp` | +/// | coreaudio | `mach_absolute_time` | +/// | wasapi | `QueryPerformanceCounter` | +/// | asio | `timeGetTime` | +/// | emscripten | `AudioContext.getOutputTimestamp` | +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub struct StreamInstant { + secs: i64, + nanos: u32, +} + +/// A timestamp associated with a call to an input stream's data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct InputStreamTimestamp { + /// The instant the stream's data callback was invoked. + pub callback: StreamInstant, + /// The instant that data was captured from the device. + /// + /// E.g. The instant data was read from an ADC. + pub capture: StreamInstant, +} + +/// A timestamp associated with a call to an output stream's data callback. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct OutputStreamTimestamp { + /// The instant the stream's data callback was invoked. + pub callback: StreamInstant, + /// The predicted instant that data written will be delivered to the device for playback. + /// + /// E.g. The instant data will be played by a DAC. + pub playback: StreamInstant, +} /// Information relevant to a single call to the user's input stream data callback. #[derive(Debug, Clone, PartialEq)] -pub struct InputCallbackInfo {} +pub struct InputCallbackInfo { + timestamp: InputStreamTimestamp, +} + +/// Information relevant to a single call to the user's output stream data callback. +#[derive(Debug, Clone, PartialEq)] +pub struct OutputCallbackInfo { + timestamp: OutputStreamTimestamp, +} impl SupportedStreamConfig { pub fn channels(&self) -> ChannelCount { @@ -249,6 +301,82 @@ impl SupportedStreamConfig { } } +impl StreamInstant { + /// The amount of time elapsed from another instant to this one. + /// + /// Returns `None` if `earlier` is later than self. + pub fn duration_since(&self, earlier: &Self) -> Option { + if self < earlier { + None + } else { + (self.as_nanos() - earlier.as_nanos()) + .try_into() + .ok() + .map(Duration::from_nanos) + } + } + + /// Returns the instant in time after the given duration has passed. + /// + /// Returns `None` if the resulting instant would exceed the bounds of the underlying data + /// structure. + pub fn add(&self, duration: Duration) -> Option { + self.as_nanos() + .checked_add(duration.as_nanos() as i128) + .and_then(Self::from_nanos_i128) + } + + /// Returns the instant in time one `duration` ago. + /// + /// Returns `None` if the resulting instant would underflow. As a result, it is important to + /// consider that on some platforms the `StreamInstant` may begin at `0` from the moment the + /// source stream is created. + pub fn sub(&self, duration: Duration) -> Option { + self.as_nanos() + .checked_sub(duration.as_nanos() as i128) + .and_then(Self::from_nanos_i128) + } + + fn as_nanos(&self) -> i128 { + (self.secs as i128 * 1_000_000_000) + self.nanos as i128 + } + + fn from_nanos(nanos: i64) -> Self { + let secs = nanos / 1_000_000_000; + let subsec_nanos = nanos - secs * 1_000_000_000; + Self::new(secs as i64, subsec_nanos as u32) + } + + fn from_nanos_i128(nanos: i128) -> Option { + let secs = nanos / 1_000_000_000; + if secs > std::i64::MAX as i128 || secs < std::i64::MIN as i128 { + None + } else { + let subsec_nanos = nanos - secs * 1_000_000_000; + debug_assert!(subsec_nanos < std::u32::MAX as i128); + Some(Self::new(secs as i64, subsec_nanos as u32)) + } + } + + fn new(secs: i64, nanos: u32) -> Self { + StreamInstant { secs, nanos } + } +} + +impl InputCallbackInfo { + /// The timestamp associated with the call to an input stream's data callback. + pub fn timestamp(&self) -> InputStreamTimestamp { + self.timestamp + } +} + +impl OutputCallbackInfo { + /// The timestamp associated with the call to an output stream's data callback. + pub fn timestamp(&self) -> OutputStreamTimestamp { + self.timestamp + } +} + impl Data { // Internal constructor for host implementations to use. // @@ -491,3 +619,37 @@ const COMMON_SAMPLE_RATES: &'static [SampleRate] = &[ SampleRate(176400), SampleRate(192000), ]; + +#[test] +fn test_stream_instant() { + let a = StreamInstant::new(2, 0); + let b = StreamInstant::new(-2, 0); + let min = StreamInstant::new(std::i64::MIN, 0); + let max = StreamInstant::new(std::i64::MAX, 0); + assert_eq!( + a.sub(Duration::from_secs(1)), + Some(StreamInstant::new(1, 0)) + ); + assert_eq!( + a.sub(Duration::from_secs(2)), + Some(StreamInstant::new(0, 0)) + ); + assert_eq!( + a.sub(Duration::from_secs(3)), + Some(StreamInstant::new(-1, 0)) + ); + assert_eq!(min.sub(Duration::from_secs(1)), None); + assert_eq!( + b.add(Duration::from_secs(1)), + Some(StreamInstant::new(-1, 0)) + ); + assert_eq!( + b.add(Duration::from_secs(2)), + Some(StreamInstant::new(0, 0)) + ); + assert_eq!( + b.add(Duration::from_secs(3)), + Some(StreamInstant::new(1, 0)) + ); + assert_eq!(max.add(Duration::from_secs(1)), None); +}