From 4d3fe57fe384e53971fa40ee939080347172a236 Mon Sep 17 00:00:00 2001 From: Tatsuyuki Ishi Date: Tue, 30 Apr 2019 15:43:47 +0900 Subject: [PATCH] Improved buffer management - ALSA backend: reuse the buffers - Make `InputBuffer` and `OutputBuffer` types just a wrapper of slice * Buffer is now submitted at the end of callback --- src/alsa/mod.rs | 198 +++++++++++++++++++----------------------- src/coreaudio/mod.rs | 45 +--------- src/emscripten/mod.rs | 120 +++++++++---------------- src/lib.rs | 44 +++------- src/null/mod.rs | 29 +------ src/wasapi/mod.rs | 2 +- src/wasapi/stream.rs | 84 ++++-------------- 7 files changed, 156 insertions(+), 366 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index a541c79..7dfb467 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -15,7 +15,7 @@ use SupportedFormat; use UnknownTypeInputBuffer; use UnknownTypeOutputBuffer; -use std::{cmp, ffi, iter, mem, ptr}; +use std::{cmp, ffi, mem, ptr}; use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use std::vec::IntoIter as VecIntoIter; @@ -370,6 +370,11 @@ struct StreamInner { // A file descriptor opened with `eventfd`. // It is used to wait for resume signal. resume_trigger: Trigger, + + // Lazily allocated buffer that is reused inside the loop. + // Zero-allocate a new buffer (the fastest way to have zeroed memory) at the first time this is + // used. + buffer: Option>, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -543,67 +548,78 @@ impl EventLoop { let available_frames = available_samples / stream_inner.num_channels as usize; + let buffer_size = stream_inner.sample_format.sample_size() * available_samples; + // Could be written with a match with improved borrow checking + if stream_inner.buffer.is_none() { + stream_inner.buffer = Some(vec![0u8; buffer_size]); + } else { + stream_inner.buffer.as_mut().unwrap().resize(buffer_size, 0u8); + } + let buffer = stream_inner.buffer.as_mut().unwrap(); + match stream_type { StreamType::Input => { - // Simplify shared logic across the sample format branches. - macro_rules! read_buffer { - ($T:ty, $Variant:ident) => {{ - // The buffer to read into. - let mut buffer: Vec<$T> = vec![Default::default(); available_samples]; - let err = alsa::snd_pcm_readi( - stream_inner.channel, - buffer.as_mut_ptr() as *mut _, - available_frames as alsa::snd_pcm_uframes_t, - ); - check_errors(err as _).expect("snd_pcm_readi error"); - let input_buffer = InputBuffer { - buffer: &buffer, - }; - let buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { - buffer: Some(input_buffer), - }); - let stream_data = StreamData::Input { buffer: buffer }; - callback(stream_id, stream_data); - }}; - } + let err = alsa::snd_pcm_readi( + stream_inner.channel, + buffer.as_mut_ptr() as *mut _, + available_frames as alsa::snd_pcm_uframes_t, + ); + check_errors(err as _).expect("snd_pcm_readi error"); - match stream_inner.sample_format { - SampleFormat::I16 => read_buffer!(i16, I16), - SampleFormat::U16 => read_buffer!(u16, U16), - SampleFormat::F32 => read_buffer!(f32, F32), - } + let input_buffer = match stream_inner.sample_format { + SampleFormat::I16 => UnknownTypeInputBuffer::I16(::InputBuffer { + buffer: cast_input_buffer(buffer), + }), + SampleFormat::U16 => UnknownTypeInputBuffer::U16(::InputBuffer { + buffer: cast_input_buffer(buffer), + }), + SampleFormat::F32 => UnknownTypeInputBuffer::F32(::InputBuffer { + buffer: cast_input_buffer(buffer), + }), + }; + let stream_data = StreamData::Input { + buffer: input_buffer, + }; + callback(stream_id, stream_data); }, StreamType::Output => { - // We're now sure that we're ready to write data. - let buffer = match stream_inner.sample_format { - SampleFormat::I16 => { - let buffer = OutputBuffer { - stream_inner: stream_inner, - buffer: vec![Default::default(); available_samples], - }; + { + // We're now sure that we're ready to write data. + let output_buffer = match stream_inner.sample_format { + SampleFormat::I16 => UnknownTypeOutputBuffer::I16(::OutputBuffer { + buffer: cast_output_buffer(buffer), + }), + SampleFormat::U16 => UnknownTypeOutputBuffer::U16(::OutputBuffer { + buffer: cast_output_buffer(buffer), + }), + SampleFormat::F32 => UnknownTypeOutputBuffer::F32(::OutputBuffer { + buffer: cast_output_buffer(buffer), + }), + }; - UnknownTypeOutputBuffer::I16(::OutputBuffer { target: Some(buffer) }) - }, - SampleFormat::U16 => { - let buffer = OutputBuffer { - stream_inner: stream_inner, - buffer: vec![Default::default(); available_samples], - }; + let stream_data = StreamData::Output { + buffer: output_buffer, + }; + callback(stream_id, stream_data); + } + loop { + let result = alsa::snd_pcm_writei( + stream_inner.channel, + buffer.as_ptr() as *const _, + available_frames as alsa::snd_pcm_uframes_t, + ); - UnknownTypeOutputBuffer::U16(::OutputBuffer { target: Some(buffer) }) - }, - SampleFormat::F32 => { - let buffer = OutputBuffer { - stream_inner: stream_inner, - buffer: vec![Default::default(); available_samples] - }; - - UnknownTypeOutputBuffer::F32(::OutputBuffer { target: Some(buffer) }) - }, - }; - - let stream_data = StreamData::Output { buffer: buffer }; - callback(stream_id, stream_data); + if result == -32 { + // buffer underrun + alsa::snd_pcm_prepare(stream_inner.channel); + } else if result < 0 { + check_errors(result as libc::c_int) + .expect("could not write pcm"); + } else { + assert_eq!(result as usize, available_frames); + break; + } + } }, } } @@ -661,6 +677,7 @@ impl EventLoop { can_pause: can_pause, is_paused: false, resume_trigger: Trigger::new(), + buffer: None, }; check_errors(alsa::snd_pcm_start(capture_handle)) @@ -721,6 +738,7 @@ impl EventLoop { can_pause: can_pause, is_paused: false, resume_trigger: Trigger::new(), + buffer: None, }; self.push_command(Command::NewStream(stream_inner)); @@ -834,15 +852,6 @@ unsafe fn set_sw_params_from_format( (buffer_len, period_len) } -pub struct InputBuffer<'a, T: 'a> { - buffer: &'a [T], -} - -pub struct OutputBuffer<'a, T: 'a> { - stream_inner: &'a mut StreamInner, - buffer: Vec, -} - /// Wrapper around `hw_params`. struct HwParams(*mut alsa::snd_pcm_hw_params_t); @@ -874,53 +883,6 @@ impl Drop for StreamInner { } } -impl<'a, T> InputBuffer<'a, T> { - #[inline] - pub fn buffer(&self) -> &[T] { - &self.buffer - } - - #[inline] - pub fn finish(self) { - // Nothing to be done. - } -} - -impl<'a, T> OutputBuffer<'a, T> { - #[inline] - pub fn buffer(&mut self) -> &mut [T] { - &mut self.buffer - } - - #[inline] - pub fn len(&self) -> usize { - self.buffer.len() - } - - pub fn finish(self) { - let to_write = (self.buffer.len() / self.stream_inner.num_channels as usize) as - alsa::snd_pcm_uframes_t; - - unsafe { - loop { - let result = alsa::snd_pcm_writei(self.stream_inner.channel, - self.buffer.as_ptr() as *const _, - to_write); - - if result == -32 { - // buffer underrun - alsa::snd_pcm_prepare(self.stream_inner.channel); - } else if result < 0 { - check_errors(result as libc::c_int).expect("could not write pcm"); - } else { - assert_eq!(result as alsa::snd_pcm_uframes_t, to_write); - break; - } - } - } - } -} - #[inline] fn check_errors(err: libc::c_int) -> Result<(), String> { use std::ffi; @@ -937,3 +899,17 @@ fn check_errors(err: libc::c_int) -> Result<(), String> { Ok(()) } + +/// Cast a byte slice into a (immutable) slice of desired type. +/// Safety: it's up to the caller to ensure that the input slice has valid bit representations. +unsafe fn cast_input_buffer(v: &[u8]) -> &[T] { + debug_assert!(v.len() % std::mem::size_of::() == 0); + std::slice::from_raw_parts(v.as_ptr() as *const T, v.len() / std::mem::size_of::()) +} + +/// Cast a byte slice into a mutable slice of desired type. +/// Safety: it's up to the caller to ensure that the input slice has valid bit representations. +unsafe fn cast_output_buffer(v: &mut [u8]) -> &mut [T] { + debug_assert!(v.len() % std::mem::size_of::() == 0); + std::slice::from_raw_parts_mut(v.as_mut_ptr() as *mut T, v.len() / std::mem::size_of::()) +} \ No newline at end of file diff --git a/src/coreaudio/mod.rs b/src/coreaudio/mod.rs index 11e2861..c363464 100644 --- a/src/coreaudio/mod.rs +++ b/src/coreaudio/mod.rs @@ -671,8 +671,7 @@ impl EventLoop { Some(cb) => cb, None => return Ok(()), }; - let buffer = InputBuffer { buffer: data_slice }; - let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: Some(buffer) }); + let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice }); let stream_data = StreamData::Input { buffer: unknown_type_buffer }; callback(StreamId(stream_id), stream_data); }}; @@ -748,8 +747,7 @@ impl EventLoop { return Ok(()); } }; - let buffer = OutputBuffer { buffer: data_slice }; - let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { target: Some(buffer) }); + let unknown_type_buffer = UnknownTypeOutputBuffer::$SampleFormat(::OutputBuffer { buffer: data_slice }); let stream_data = StreamData::Output { buffer: unknown_type_buffer }; callback(StreamId(stream_id), stream_data); }}; @@ -798,42 +796,3 @@ impl EventLoop { } } } - -pub struct InputBuffer<'a, T: 'a> { - buffer: &'a [T], -} - -pub struct OutputBuffer<'a, T: 'a> { - buffer: &'a mut [T], -} - -impl<'a, T> InputBuffer<'a, T> { - #[inline] - pub fn buffer(&self) -> &[T] { - &self.buffer - } - - #[inline] - pub fn finish(self) { - // Nothing to be done. - } -} - -impl<'a, T> OutputBuffer<'a, T> - where T: Sample -{ - #[inline] - pub fn buffer(&mut self) -> &mut [T] { - &mut self.buffer - } - - #[inline] - pub fn len(&self) -> usize { - self.buffer.len() - } - - #[inline] - pub fn finish(self) { - // Do nothing. We wrote directly to the buffer. - } -} diff --git a/src/emscripten/mod.rs b/src/emscripten/mod.rs index 9f435ea..b95a5df 100644 --- a/src/emscripten/mod.rs +++ b/src/emscripten/mod.rs @@ -12,7 +12,6 @@ use CreationError; use DefaultFormatError; use Format; use FormatsEnumerationError; -use Sample; use StreamData; use SupportedFormat; use UnknownTypeOutputBuffer; @@ -63,14 +62,47 @@ impl EventLoop { None => continue, }; - let buffer = OutputBuffer { - temporary_buffer: vec![0.0; 44100 * 2 / 3], - stream: &stream, + let mut temporary_buffer = vec![0.0; 44100 * 2 / 3]; + + { + let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { buffer: &mut temporary_buffer }); + let data = StreamData::Output { buffer: buffer }; + user_cb(StreamId(stream_id), data); + // TODO: directly use a TypedArray once this is supported by stdweb + } + + let typed_array = { + let f32_slice = temporary_buffer.as_slice(); + let u8_slice: &[u8] = unsafe { + from_raw_parts(f32_slice.as_ptr() as *const _, + f32_slice.len() * mem::size_of::()) + }; + let typed_array: TypedArray = u8_slice.into(); + typed_array }; - let buffer = UnknownTypeOutputBuffer::F32(::OutputBuffer { target: Some(buffer) }); - let data = StreamData::Output { buffer: buffer }; - user_cb(StreamId(stream_id), data); + let num_channels = 2u32; // TODO: correct value + debug_assert_eq!(temporary_buffer.len() % num_channels as usize, 0); + + js!( + var src_buffer = new Float32Array(@{typed_array}.buffer); + var context = @{stream}; + var buf_len = @{temporary_buffer.len() as u32}; + var num_channels = @{num_channels}; + + var buffer = context.createBuffer(num_channels, buf_len / num_channels, 44100); + for (var channel = 0; channel < num_channels; ++channel) { + var buffer_content = buffer.getChannelData(channel); + for (var i = 0; i < buf_len / num_channels; ++i) { + buffer_content[i] = src_buffer[i * num_channels + channel]; + } + } + + var node = context.createBufferSource(); + node.buffer = buffer; + node.connect(context.destination); + node.start(); + ); } set_timeout(|| callback_fn::(user_data_ptr), 330); @@ -235,77 +267,3 @@ impl Device { pub type SupportedInputFormats = ::std::vec::IntoIter; pub type SupportedOutputFormats = ::std::vec::IntoIter; - -pub struct InputBuffer<'a, T: 'a> { - marker: ::std::marker::PhantomData<&'a T>, -} - -pub struct OutputBuffer<'a, T: 'a> - where T: Sample -{ - temporary_buffer: Vec, - stream: &'a Reference, -} - -impl<'a, T> InputBuffer<'a, T> { - #[inline] - pub fn buffer(&self) -> &[T] { - unimplemented!() - } - - #[inline] - pub fn finish(self) { - } -} - -impl<'a, T> OutputBuffer<'a, T> - where T: Sample -{ - #[inline] - pub fn buffer(&mut self) -> &mut [T] { - &mut self.temporary_buffer - } - - #[inline] - pub fn len(&self) -> usize { - self.temporary_buffer.len() - } - - #[inline] - pub fn finish(self) { - // TODO: directly use a TypedArray once this is supported by stdweb - - let typed_array = { - let t_slice: &[T] = self.temporary_buffer.as_slice(); - let u8_slice: &[u8] = unsafe { - from_raw_parts(t_slice.as_ptr() as *const _, - t_slice.len() * mem::size_of::()) - }; - let typed_array: TypedArray = u8_slice.into(); - typed_array - }; - - let num_channels = 2u32; // TODO: correct value - debug_assert_eq!(self.temporary_buffer.len() % num_channels as usize, 0); - - js!( - var src_buffer = new Float32Array(@{typed_array}.buffer); - var context = @{self.stream}; - var buf_len = @{self.temporary_buffer.len() as u32}; - var num_channels = @{num_channels}; - - var buffer = context.createBuffer(num_channels, buf_len / num_channels, 44100); - for (var channel = 0; channel < num_channels; ++channel) { - var buffer_content = buffer.getChannelData(channel); - for (var i = 0; i < buf_len / num_channels; ++i) { - buffer_content[i] = src_buffer[i * num_channels + channel]; - } - } - - var node = context.createBufferSource(); - node.buffer = buffer; - node.connect(context.destination); - node.start(); - ); - } -} diff --git a/src/lib.rs b/src/lib.rs index fcabc86..b38fc15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -211,31 +211,27 @@ pub enum StreamData<'a> { /// This struct implements the `Deref` trait targeting `[T]`. Therefore this buffer can be read the /// same way as reading from a `Vec` or any other kind of Rust array. // TODO: explain audio stuff in general +// TODO: remove the wrapper and just use slices in next major version pub struct InputBuffer<'a, T: 'a> where T: Sample, { - // Always contains something, taken by `Drop` - // TODO: change that - buffer: Option>, + buffer: &'a [T], } -/// Represents a buffer that must be filled with audio data. -/// -/// You should destroy this object as soon as possible. Data is only sent to the audio device when -/// this object is destroyed. +/// Represents a buffer that must be filled with audio data. The buffer in unfilled state may +/// contain garbage values. /// /// This struct implements the `Deref` and `DerefMut` traits to `[T]`. Therefore writing to this /// buffer is done in the same way as writing to a `Vec` or any other kind of Rust array. // TODO: explain audio stuff in general +// TODO: remove the wrapper and just use slices #[must_use] pub struct OutputBuffer<'a, T: 'a> where T: Sample, { - // Always contains something, taken by `Drop` - // TODO: change that - target: Option>, + buffer: &'a mut [T], } /// This is the struct that is provided to you by cpal when you want to read samples from a buffer. @@ -586,16 +582,7 @@ impl<'a, T> Deref for InputBuffer<'a, T> #[inline] fn deref(&self) -> &[T] { - self.buffer.as_ref().unwrap().buffer() - } -} - -impl<'a, T> Drop for InputBuffer<'a, T> - where T: Sample -{ - #[inline] - fn drop(&mut self) { - self.buffer.take().unwrap().finish(); + self.buffer } } @@ -615,16 +602,7 @@ impl<'a, T> DerefMut for OutputBuffer<'a, T> { #[inline] fn deref_mut(&mut self) -> &mut [T] { - self.target.as_mut().unwrap().buffer() - } -} - -impl<'a, T> Drop for OutputBuffer<'a, T> - where T: Sample -{ - #[inline] - fn drop(&mut self) { - self.target.take().unwrap().finish(); + self.buffer } } @@ -645,9 +623,9 @@ impl<'a> UnknownTypeOutputBuffer<'a> { #[inline] pub fn len(&self) -> usize { match self { - &UnknownTypeOutputBuffer::U16(ref buf) => buf.target.as_ref().unwrap().len(), - &UnknownTypeOutputBuffer::I16(ref buf) => buf.target.as_ref().unwrap().len(), - &UnknownTypeOutputBuffer::F32(ref buf) => buf.target.as_ref().unwrap().len(), + &UnknownTypeOutputBuffer::U16(ref buf) => buf.len(), + &UnknownTypeOutputBuffer::I16(ref buf) => buf.len(), + &UnknownTypeOutputBuffer::F32(ref buf) => buf.len(), } } } diff --git a/src/null/mod.rs b/src/null/mod.rs index d74506c..244f086 100644 --- a/src/null/mod.rs +++ b/src/null/mod.rs @@ -132,31 +132,4 @@ pub struct InputBuffer<'a, T: 'a> { pub struct OutputBuffer<'a, T: 'a> { marker: PhantomData<&'a mut T>, -} - -impl<'a, T> InputBuffer<'a, T> { - #[inline] - pub fn buffer(&self) -> &[T] { - unimplemented!() - } - - #[inline] - pub fn finish(self) { - } -} - -impl<'a, T> OutputBuffer<'a, T> { - #[inline] - pub fn buffer(&mut self) -> &mut [T] { - unimplemented!() - } - - #[inline] - pub fn len(&self) -> usize { - 0 - } - - #[inline] - pub fn finish(self) { - } -} +} \ No newline at end of file diff --git a/src/wasapi/mod.rs b/src/wasapi/mod.rs index 016c787..82026e7 100644 --- a/src/wasapi/mod.rs +++ b/src/wasapi/mod.rs @@ -3,7 +3,7 @@ extern crate winapi; use std::io::Error as IoError; pub use self::device::{Device, Devices, SupportedInputFormats, SupportedOutputFormats, default_input_device, default_output_device}; -pub use self::stream::{InputBuffer, OutputBuffer, EventLoop, StreamId}; +pub use self::stream::{EventLoop, StreamId}; use self::winapi::um::winnt::HRESULT; mod com; diff --git a/src/wasapi/stream.rs b/src/wasapi/stream.rs index 49d154c..2d7f9cd 100644 --- a/src/wasapi/stream.rs +++ b/src/wasapi/stream.rs @@ -535,9 +535,8 @@ impl EventLoop { ($T:ty, $Variant:ident) => {{ let buffer_data = buffer as *mut _ as *const $T; let slice = slice::from_raw_parts(buffer_data, buffer_len); - let input_buffer = InputBuffer { buffer: slice }; let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { - buffer: Some(input_buffer), + buffer: slice, }); let data = StreamData::Input { buffer: unknown_buffer }; callback(stream_id, data); @@ -576,19 +575,24 @@ impl EventLoop { macro_rules! render_callback { ($T:ty, $Variant:ident) => {{ let buffer_data = buffer as *mut $T; - let output_buffer = OutputBuffer { - stream: stream, - buffer_data: buffer_data, - buffer_len: buffer_len, - frames: frames_available, - marker: PhantomData, - }; + let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer { - target: Some(output_buffer) + buffer: slice }); let data = StreamData::Output { buffer: unknown_buffer }; callback(stream_id, data); - }}; + let hresult = match stream.client_flow { + AudioClientFlow::Render { render_client } => { + (*render_client).ReleaseBuffer(frames_available as u32, 0) + }, + _ => unreachable!(), + }; + match check_result(hresult) { + // Ignoring the error that is produced if the device has been disconnected. + Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => (), + e => e.unwrap(), + }; + }} } match stream.sample_format { @@ -661,64 +665,6 @@ impl Drop for StreamInner { } } -pub struct InputBuffer<'a, T: 'a> { - buffer: &'a [T], -} - -pub struct OutputBuffer<'a, T: 'a> { - stream: &'a mut StreamInner, - - buffer_data: *mut T, - buffer_len: usize, - frames: UINT32, - - marker: PhantomData<&'a mut [T]>, -} - -unsafe impl<'a, T> Send for OutputBuffer<'a, T> { -} - -impl<'a, T> InputBuffer<'a, T> { - #[inline] - pub fn buffer(&self) -> &[T] { - &self.buffer - } - - #[inline] - pub fn finish(self) { - // Nothing to be done. - } -} - -impl<'a, T> OutputBuffer<'a, T> { - #[inline] - pub fn buffer(&mut self) -> &mut [T] { - unsafe { slice::from_raw_parts_mut(self.buffer_data, self.buffer_len) } - } - - #[inline] - pub fn len(&self) -> usize { - self.buffer_len - } - - #[inline] - pub fn finish(self) { - unsafe { - let hresult = match self.stream.client_flow { - AudioClientFlow::Render { render_client } => { - (*render_client).ReleaseBuffer(self.frames as u32, 0) - }, - _ => unreachable!(), - }; - match check_result(hresult) { - // Ignoring the error that is produced if the device has been disconnected. - Err(ref e) if e.raw_os_error() == Some(AUDCLNT_E_DEVICE_INVALIDATED) => (), - e => e.unwrap(), - }; - } - } -} - // Turns a `Format` into a `WAVEFORMATEXTENSIBLE`. // // Returns `None` if the WAVEFORMATEXTENSIBLE does not support the given format.