diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index a4cc166..fc8902b 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -702,43 +702,21 @@ fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), error_callback: &mut dyn FnMut(StreamError)) -> () { unsafe { - // We keep `run_context` locked forever, which guarantees that two invocations of - // `run()` cannot run simultaneously. - let mut run_context = self.run_context.lock().unwrap(); - // Force a deref so that borrow checker can operate on each field independently. - // Shadow the name because we don't use (or drop) it otherwise. - let run_context = &mut *run_context; - - // Keep track of the set of streams that should be removed due to some error occurring. - // - // Checked at the start of each loop. - let mut streams_to_remove: Vec<(StreamId, StreamError)> = vec![]; - 'stream_loop: loop { - // Remove any failed streams. - for (stream_id, err) in streams_to_remove.drain(..) { - match run_context.stream.iter().position(|s| s.id == stream_id) { - None => continue, - Some(p) => { - run_context.handles.remove(p + 1); - run_context.stream.remove(p); - callback(stream_id, Err(err.into())); - }, - } - } - // Process queued commands. - process_commands(run_context, callback); + match process_commands(run_context, error_callback) { + Ok(()) => (), + Err(err) => { + error_callback(err); + break 'stream_loop; + } + }; // Wait for any of the handles to be signalled. let handle_idx = match wait_for_handle_signal(&run_context.handles) { Ok(idx) => idx, Err(err) => { - for stream in &run_context.stream { - callback(stream.id.clone(), Err(err.clone().into())); - } - run_context.stream.clear(); - run_context.handles.truncate(1); + error_callback(err); break 'stream_loop; } }; @@ -749,9 +727,7 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), continue; } - let stream_idx = handle_idx - 1; - let stream = &mut run_context.stream[stream_idx]; - + let stream = run_context.stream; let sample_size = stream.sample_format.sample_size(); // Obtaining a pointer to the buffer. @@ -765,8 +741,8 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), loop { let hresult = (*capture_client).GetNextPacketSize(&mut frames_available); if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - break; // Identical to continuing the outer loop + error_callback(err); + break 'stream_loop; } if frames_available == 0 { break; @@ -783,8 +759,8 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), if hresult == AUDCLNT_S_BUFFER_EMPTY { continue; } else if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - break; // Identical to continuing the outer loop + error_callback(err); + break 'stream_loop; } debug_assert!(!buffer.is_null()); @@ -801,12 +777,12 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), buffer: slice, }); let data = StreamData::Input { buffer: unknown_buffer }; - callback(stream.id.clone(), Ok(data)); + data_callback(stream.id.clone(), Ok(data)); // Release the buffer. let hresult = (*capture_client).ReleaseBuffer(frames_available); if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - continue; + error_callback(err); + break 'stream_loop; } }}; } @@ -825,8 +801,8 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), Ok(0) => continue, // TODO: Can this happen? Ok(n) => n, Err(err) => { - streams_to_remove.push((stream.id.clone(), err)); - continue; + error_callback(err); + break 'stream_loop; } }; @@ -837,8 +813,8 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), ); if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - continue; + error_callback(err); + break 'stream_loop; } debug_assert!(!buffer.is_null()); @@ -854,12 +830,12 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), buffer: slice }); let data = StreamData::Output { buffer: unknown_buffer }; - callback(stream.id.clone(), Ok(data)); + data_callback(stream.id.clone(), Ok(data)); let hresult = (*render_client) .ReleaseBuffer(frames_available as u32, 0); if let Err(err) = stream_error_from_hresult(hresult) { - streams_to_remove.push((stream.id.clone(), err)); - continue; + error_callback(err); + break 'stream_loop; } }} } @@ -873,6 +849,4 @@ fn run_inner(run_context: RunContext, data_callback: &mut dyn FnMut(StreamData), } } } - - panic!("`cpal::EventLoop::run` API currently disallows returning"); }