Keep a single stream only

This commit is contained in:
Viktor Lazarev 2019-08-28 15:22:53 +02:00 committed by mitchmindtree
parent cffd2da582
commit 463540f370
1 changed files with 22 additions and 20 deletions

View File

@ -21,6 +21,8 @@ use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::{Arc};
use BackendSpecificError; use BackendSpecificError;
use BuildStreamError; use BuildStreamError;
use Format; use Format;
@ -55,7 +57,7 @@ pub struct EventLoop {
struct RunContext { struct RunContext {
// Streams that have been created in this event loop. // Streams that have been created in this event loop.
streams: Vec<StreamInner>, stream: Arc<StreamInner>,
// Handles corresponding to the `event` field of each element of `voices`. Must always be in // Handles corresponding to the `event` field of each element of `voices`. Must always be in
// sync with `voices`, except that the first element is always `pending_scheduled_event`. // sync with `voices`, except that the first element is always `pending_scheduled_event`.
@ -106,7 +108,7 @@ impl EventLoop {
EventLoop { EventLoop {
pending_scheduled_event: pending_scheduled_event, pending_scheduled_event: pending_scheduled_event,
run_context: Mutex::new(RunContext { run_context: Mutex::new(RunContext {
streams: Vec::new(), stream: Arc::new(),
handles: vec![pending_scheduled_event], handles: vec![pending_scheduled_event],
commands: rx, commands: rx,
}), }),
@ -473,11 +475,11 @@ impl EventLoop {
'stream_loop: loop { 'stream_loop: loop {
// Remove any failed streams. // Remove any failed streams.
for (stream_id, err) in streams_to_remove.drain(..) { for (stream_id, err) in streams_to_remove.drain(..) {
match run_context.streams.iter().position(|s| s.id == stream_id) { match run_context.stream.iter().position(|s| s.id == stream_id) {
None => continue, None => continue,
Some(p) => { Some(p) => {
run_context.handles.remove(p + 1); run_context.handles.remove(p + 1);
run_context.streams.remove(p); run_context.stream.remove(p);
callback(stream_id, Err(err.into())); callback(stream_id, Err(err.into()));
}, },
} }
@ -490,10 +492,10 @@ impl EventLoop {
let handle_idx = match wait_for_handle_signal(&run_context.handles) { let handle_idx = match wait_for_handle_signal(&run_context.handles) {
Ok(idx) => idx, Ok(idx) => idx,
Err(err) => { Err(err) => {
for stream in &run_context.streams { for stream in &run_context.stream {
callback(stream.id.clone(), Err(err.clone().into())); callback(stream.id.clone(), Err(err.clone().into()));
} }
run_context.streams.clear(); run_context.stream.clear();
run_context.handles.truncate(1); run_context.handles.truncate(1);
break 'stream_loop; break 'stream_loop;
} }
@ -506,7 +508,7 @@ impl EventLoop {
} }
let stream_idx = handle_idx - 1; let stream_idx = handle_idx - 1;
let stream = &mut run_context.streams[stream_idx]; let stream = &mut run_context.stream[stream_idx];
let sample_size = stream.sample_format.sample_size(); let sample_size = stream.sample_format.sample_size();
@ -759,34 +761,34 @@ fn process_commands(
match command { match command {
Command::NewStream(stream_inner) => { Command::NewStream(stream_inner) => {
let event = stream_inner.event; let event = stream_inner.event;
run_context.streams.push(stream_inner); run_context.stream.push(stream_inner);
run_context.handles.push(event); run_context.handles.push(event);
}, },
Command::DestroyStream(stream_id) => { Command::DestroyStream(stream_id) => {
match run_context.streams.iter().position(|s| s.id == stream_id) { match run_context.stream.iter().position(|s| s.id == stream_id) {
None => continue, None => continue,
Some(p) => { Some(p) => {
run_context.handles.remove(p + 1); run_context.handles.remove(p + 1);
run_context.streams.remove(p); run_context.stream.remove(p);
}, },
} }
}, },
Command::PlayStream(stream_id) => { Command::PlayStream(stream_id) => {
match run_context.streams.iter().position(|s| s.id == stream_id) { match run_context.stream.iter().position(|s| s.id == stream_id) {
None => continue, None => continue,
Some(p) => { Some(p) => {
if !run_context.streams[p].playing { if !run_context.stream[p].playing {
let hresult = unsafe { let hresult = unsafe {
(*run_context.streams[p].audio_client).Start() (*run_context.stream[p].audio_client).Start()
}; };
match stream_error_from_hresult(hresult) { match stream_error_from_hresult(hresult) {
Ok(()) => { Ok(()) => {
run_context.streams[p].playing = true; run_context.stream[p].playing = true;
} }
Err(err) => { Err(err) => {
callback(stream_id, Err(err.into())); callback(stream_id, Err(err.into()));
run_context.handles.remove(p + 1); run_context.handles.remove(p + 1);
run_context.streams.remove(p); run_context.stream.remove(p);
} }
} }
} }
@ -794,21 +796,21 @@ fn process_commands(
} }
}, },
Command::PauseStream(stream_id) => { Command::PauseStream(stream_id) => {
match run_context.streams.iter().position(|s| s.id == stream_id) { match run_context.stream.iter().position(|s| s.id == stream_id) {
None => continue, None => continue,
Some(p) => { Some(p) => {
if run_context.streams[p].playing { if run_context.stream[p].playing {
let hresult = unsafe { let hresult = unsafe {
(*run_context.streams[p].audio_client).Stop() (*run_context.stream[p].audio_client).Stop()
}; };
match stream_error_from_hresult(hresult) { match stream_error_from_hresult(hresult) {
Ok(()) => { Ok(()) => {
run_context.streams[p].playing = false; run_context.stream[p].playing = false;
} }
Err(err) => { Err(err) => {
callback(stream_id, Err(err.into())); callback(stream_id, Err(err.into()));
run_context.handles.remove(p + 1); run_context.handles.remove(p + 1);
run_context.streams.remove(p); run_context.stream.remove(p);
} }
} }
} }