From b00630bdcc9c675a2fd403dbe69b8da7bbed63f4 Mon Sep 17 00:00:00 2001 From: Viktor Lazarev Date: Wed, 28 Aug 2019 15:39:50 +0200 Subject: [PATCH] Extract "run_inner" as a standalone function --- src/host/wasapi/stream.rs | 354 +++++++++++++++++++------------------- 1 file changed, 177 insertions(+), 177 deletions(-) diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 2599fa8..a4cc166 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -458,183 +458,6 @@ impl EventLoop { self.run_inner(&mut callback); } - fn run_inner(&self, callback: &mut dyn FnMut(StreamId, StreamData)) -> ! { - unsafe { - // We keep `run_context` locked forever, which guarantees that two invocations of - // `run()` cannot run simultaneously. - let mut run_context = self.run_context.lock().unwrap(); - // Force a deref so that borrow checker can operate on each field independently. - // Shadow the name because we don't use (or drop) it otherwise. - let run_context = &mut *run_context; - - // Keep track of the set of streams that should be removed due to some error occurring. - // - // Checked at the start of each loop. - let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; - - 'stream_loop: loop { - // Remove any failed streams. - for (stream_id, err) in streams_to_remove.drain(..) { - match run_context.stream.iter().position(|s| s.id == stream_id) { - None => continue, - Some(p) => { - run_context.handles.remove(p + 1); - run_context.stream.remove(p); - callback(stream_id, Err(err.into())); - }, - } - } - - // Process queued commands. - process_commands(run_context, callback); - - // Wait for any of the handles to be signalled. - let handle_idx = match wait_for_handle_signal(&run_context.handles) { - Ok(idx) => idx, - Err(err) => { - for stream in &run_context.stream { - callback(stream.id.clone(), Err(err.clone().into())); - } - run_context.stream.clear(); - run_context.handles.truncate(1); - break 'stream_loop; - } - }; - - // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in - // order for us to pick up the pending commands. Otherwise, a stream needs data. - if handle_idx == 0 { - continue; - } - - let stream_idx = handle_idx - 1; - let stream = &mut run_context.stream[stream_idx]; - - let sample_size = stream.sample_format.sample_size(); - - // Obtaining a pointer to the buffer. - match stream.client_flow { - - AudioClientFlow::Capture { capture_client } => { - let mut frames_available = 0; - // Get the available data in the shared buffer. - let mut buffer: *mut BYTE = mem::uninitialized(); - let mut flags = mem::uninitialized(); - loop { - let hresult = (*capture_client).GetNextPacketSize(&mut frames_available); - if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - break; // Identical to continuing the outer loop - } - if frames_available == 0 { - break; - } - let hresult = (*capture_client).GetBuffer( - &mut buffer, - &mut frames_available, - &mut flags, - ptr::null_mut(), - ptr::null_mut(), - ); - - // TODO: Can this happen? - if hresult == AUDCLNT_S_BUFFER_EMPTY { - continue; - } else if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - break; // Identical to continuing the outer loop - } - - debug_assert!(!buffer.is_null()); - - let buffer_len = frames_available as usize - * stream.bytes_per_frame as usize / sample_size; - - // Simplify the capture callback sample format branches. - macro_rules! capture_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut _ as *const $T; - let slice = slice::from_raw_parts(buffer_data, buffer_len); - let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { - buffer: slice, - }); - let data = StreamData::Input { buffer: unknown_buffer }; - callback(stream.id.clone(), Ok(data)); - // Release the buffer. - let hresult = (*capture_client).ReleaseBuffer(frames_available); - if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - continue; - } - }}; - } - - match stream.sample_format { - SampleFormat::F32 => capture_callback!(f32, F32), - SampleFormat::I16 => capture_callback!(i16, I16), - SampleFormat::U16 => capture_callback!(u16, U16), - } - } - }, - - AudioClientFlow::Render { render_client } => { - // The number of frames available for writing. - let frames_available = match get_available_frames(stream) { - Ok(0) => continue, // TODO: Can this happen? - Ok(n) => n, - Err(err) => { - streams_to_remove.push((stream.id.clone(), err)); - continue; - } - }; - - let mut buffer: *mut BYTE = mem::uninitialized(); - let hresult = (*render_client).GetBuffer( - frames_available, - &mut buffer as *mut *mut _, - ); - - if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - continue; - } - - debug_assert!(!buffer.is_null()); - let buffer_len = frames_available as usize - * stream.bytes_per_frame as usize / sample_size; - - // Simplify the render callback sample format branches. - macro_rules! render_callback { - ($T:ty, $Variant:ident) => {{ - let buffer_data = buffer as *mut $T; - let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); - let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer { - buffer: slice - }); - let data = StreamData::Output { buffer: unknown_buffer }; - callback(stream.id.clone(), Ok(data)); - let hresult = (*render_client) - .ReleaseBuffer(frames_available as u32, 0); - if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - continue; - } - }} - } - - match stream.sample_format { - SampleFormat::F32 => render_callback!(f32, F32), - SampleFormat::I16 => render_callback!(i16, I16), - SampleFormat::U16 => render_callback!(u16, U16), - } - }, - } - } - } - - panic!("`cpal::EventLoop::run` API currently disallows returning"); - } - #[inline] pub(crate) fn play_stream(&self, stream: StreamId) -> Result<(), PlayStreamError> { self.push_command(Command::PlayStream(stream)); @@ -876,3 +699,180 @@ fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> } Ok(()) } + +fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), error_callback: &mut dyn FnMut(StreamError)) -> () { + unsafe { + // We keep `run_context` locked forever, which guarantees that two invocations of + // `run()` cannot run simultaneously. + let mut run_context = self.run_context.lock().unwrap(); + // Force a deref so that borrow checker can operate on each field independently. + // Shadow the name because we don't use (or drop) it otherwise. + let run_context = &mut *run_context; + + // Keep track of the set of streams that should be removed due to some error occurring. + // + // Checked at the start of each loop. + let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; + + 'stream_loop: loop { + // Remove any failed streams. + for (stream_id, err) in streams_to_remove.drain(..) { + match run_context.stream.iter().position(|s| s.id == stream_id) { + None => continue, + Some(p) => { + run_context.handles.remove(p + 1); + run_context.stream.remove(p); + callback(stream_id, Err(err.into())); + }, + } + } + + // Process queued commands. + process_commands(run_context, callback); + + // Wait for any of the handles to be signalled. + let handle_idx = match wait_for_handle_signal(&run_context.handles) { + Ok(idx) => idx, + Err(err) => { + for stream in &run_context.stream { + callback(stream.id.clone(), Err(err.clone().into())); + } + run_context.stream.clear(); + run_context.handles.truncate(1); + break 'stream_loop; + } + }; + + // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in + // order for us to pick up the pending commands. Otherwise, a stream needs data. + if handle_idx == 0 { + continue; + } + + let stream_idx = handle_idx - 1; + let stream = &mut run_context.stream[stream_idx]; + + let sample_size = stream.sample_format.sample_size(); + + // Obtaining a pointer to the buffer. + match stream.client_flow { + + AudioClientFlow::Capture { capture_client } => { + let mut frames_available = 0; + // Get the available data in the shared buffer. + let mut buffer: *mut BYTE = mem::uninitialized(); + let mut flags = mem::uninitialized(); + loop { + let hresult = (*capture_client).GetNextPacketSize(&mut frames_available); + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + break; // Identical to continuing the outer loop + } + if frames_available == 0 { + break; + } + let hresult = (*capture_client).GetBuffer( + &mut buffer, + &mut frames_available, + &mut flags, + ptr::null_mut(), + ptr::null_mut(), + ); + + // TODO: Can this happen? + if hresult == AUDCLNT_S_BUFFER_EMPTY { + continue; + } else if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + break; // Identical to continuing the outer loop + } + + debug_assert!(!buffer.is_null()); + + let buffer_len = frames_available as usize + * stream.bytes_per_frame as usize / sample_size; + + // Simplify the capture callback sample format branches. + macro_rules! capture_callback { + ($T:ty, $Variant:ident) => {{ + let buffer_data = buffer as *mut _ as *const $T; + let slice = slice::from_raw_parts(buffer_data, buffer_len); + let unknown_buffer = UnknownTypeInputBuffer::$Variant(::InputBuffer { + buffer: slice, + }); + let data = StreamData::Input { buffer: unknown_buffer }; + callback(stream.id.clone(), Ok(data)); + // Release the buffer. + let hresult = (*capture_client).ReleaseBuffer(frames_available); + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + }}; + } + + match stream.sample_format { + SampleFormat::F32 => capture_callback!(f32, F32), + SampleFormat::I16 => capture_callback!(i16, I16), + SampleFormat::U16 => capture_callback!(u16, U16), + } + } + }, + + AudioClientFlow::Render { render_client } => { + // The number of frames available for writing. + let frames_available = match get_available_frames(stream) { + Ok(0) => continue, // TODO: Can this happen? + Ok(n) => n, + Err(err) => { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + }; + + let mut buffer: *mut BYTE = mem::uninitialized(); + let hresult = (*render_client).GetBuffer( + frames_available, + &mut buffer as *mut *mut _, + ); + + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + + debug_assert!(!buffer.is_null()); + let buffer_len = frames_available as usize + * stream.bytes_per_frame as usize / sample_size; + + // Simplify the render callback sample format branches. + macro_rules! render_callback { + ($T:ty, $Variant:ident) => {{ + let buffer_data = buffer as *mut $T; + let slice = slice::from_raw_parts_mut(buffer_data, buffer_len); + let unknown_buffer = UnknownTypeOutputBuffer::$Variant(::OutputBuffer { + buffer: slice + }); + let data = StreamData::Output { buffer: unknown_buffer }; + callback(stream.id.clone(), Ok(data)); + let hresult = (*render_client) + .ReleaseBuffer(frames_available as u32, 0); + if let Err(err) = stream_error_from_hresult(hresult) { + streams_to_remove.push((stream.id.clone(), err)); + continue; + } + }} + } + + match stream.sample_format { + SampleFormat::F32 => render_callback!(f32, F32), + SampleFormat::I16 => render_callback!(i16, I16), + SampleFormat::U16 => render_callback!(u16, U16), + } + }, + } + } + } + + panic!("`cpal::EventLoop::run` API currently disallows returning"); +}