From 1d3754fa7d78364fe2acffd3f967fbd142905a01 Mon Sep 17 00:00:00 2001 From: Tatsuyuki Ishi Date: Thu, 13 Jun 2019 15:31:37 +0900 Subject: [PATCH] Use channels instead of mutex --- src/alsa/mod.rs | 105 ++++++++++++++++++++++--------------------- src/wasapi/stream.rs | 51 ++++++++++----------- 2 files changed, 77 insertions(+), 79 deletions(-) diff --git a/src/alsa/mod.rs b/src/alsa/mod.rs index 4d7ebfb..2131aaf 100644 --- a/src/alsa/mod.rs +++ b/src/alsa/mod.rs @@ -17,6 +17,7 @@ use UnknownTypeOutputBuffer; use std::{cmp, ffi, mem, ptr}; use std::sync::Mutex; +use std::sync::mpsc::{channel, Sender, Receiver}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::vec::IntoIter as VecIntoIter; @@ -325,8 +326,7 @@ pub struct EventLoop { run_context: Mutex, // Commands processed by the `run()` method that is currently running. - // TODO: use a lock-free container - commands: Mutex>, + commands: Sender, } unsafe impl Send for EventLoop { @@ -347,6 +347,8 @@ struct RunContext { descriptors: Vec, // List of streams that are written in `descriptors`. streams: Vec, + + commands: Receiver, } struct StreamInner { @@ -404,16 +406,19 @@ impl EventLoop { }, ]; + let (tx, rx) = channel(); + let run_context = Mutex::new(RunContext { descriptors: initial_descriptors, streams: Vec::new(), + commands: rx, }); EventLoop { next_stream_id: AtomicUsize::new(0), pending_trigger: pending_trigger, run_context, - commands: Mutex::new(Vec::new()), + commands: tx, } } @@ -431,56 +436,53 @@ impl EventLoop { loop { { - let mut commands_lock = self.commands.lock().unwrap(); - if !commands_lock.is_empty() { - for command in commands_lock.drain(..) { - match command { - Command::DestroyStream(stream_id) => { - run_context.streams.retain(|s| s.id != stream_id); - }, - Command::PlayStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - alsa::snd_pcm_pause(stream.channel, 0); - stream.is_paused = false; - } - }, - Command::PauseStream(stream_id) => { - if let Some(stream) = run_context.streams.iter_mut() - .find(|stream| stream.can_pause && stream.id == stream_id) - { - alsa::snd_pcm_pause(stream.channel, 1); - stream.is_paused = true; - } - }, - Command::NewStream(stream_inner) => { - run_context.streams.push(stream_inner); - }, - } - } - - run_context.descriptors = vec![ - libc::pollfd { - fd: self.pending_trigger.read_fd(), - events: libc::POLLIN, - revents: 0, + for command in run_context.commands.try_iter() { + match command { + Command::DestroyStream(stream_id) => { + run_context.streams.retain(|s| s.id != stream_id); + }, + Command::PlayStream(stream_id) => { + if let Some(stream) = run_context.streams.iter_mut() + .find(|stream| stream.can_pause && stream.id == stream_id) + { + alsa::snd_pcm_pause(stream.channel, 0); + stream.is_paused = false; + } + }, + Command::PauseStream(stream_id) => { + if let Some(stream) = run_context.streams.iter_mut() + .find(|stream| stream.can_pause && stream.id == stream_id) + { + alsa::snd_pcm_pause(stream.channel, 1); + stream.is_paused = true; + } + }, + Command::NewStream(stream_inner) => { + run_context.streams.push(stream_inner); }, - ]; - for stream in run_context.streams.iter() { - run_context.descriptors.reserve(stream.num_descriptors); - let len = run_context.descriptors.len(); - let filled = alsa::snd_pcm_poll_descriptors(stream.channel, - run_context - .descriptors - .as_mut_ptr() - .offset(len as isize), - stream.num_descriptors as - libc::c_uint); - debug_assert_eq!(filled, stream.num_descriptors as libc::c_int); - run_context.descriptors.set_len(len + stream.num_descriptors); } } + + run_context.descriptors = vec![ + libc::pollfd { + fd: self.pending_trigger.read_fd(), + events: libc::POLLIN, + revents: 0, + }, + ]; + for stream in run_context.streams.iter() { + run_context.descriptors.reserve(stream.num_descriptors); + let len = run_context.descriptors.len(); + let filled = alsa::snd_pcm_poll_descriptors(stream.channel, + run_context + .descriptors + .as_mut_ptr() + .offset(len as isize), + stream.num_descriptors as + libc::c_uint); + debug_assert_eq!(filled, stream.num_descriptors as libc::c_int); + run_context.descriptors.set_len(len + stream.num_descriptors); + } } let ret = libc::poll(run_context.descriptors.as_mut_ptr(), @@ -765,7 +767,8 @@ impl EventLoop { #[inline] fn push_command(&self, command: Command) { - self.commands.lock().unwrap().push(command); + // Safe to unwrap: sender outlives receiver. + self.commands.send(command).unwrap(); self.pending_trigger.wakeup(); } diff --git a/src/wasapi/stream.rs b/src/wasapi/stream.rs index 0442008..084c10a 100644 --- a/src/wasapi/stream.rs +++ b/src/wasapi/stream.rs @@ -16,6 +16,7 @@ use std::mem; use std::ptr; use std::slice; use std::sync::Mutex; +use std::sync::mpsc::{channel, Sender, Receiver}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -40,8 +41,7 @@ pub struct EventLoop { // Commands processed by the `run()` method that is currently running. // `pending_scheduled_event` must be signalled whenever a command is added here, so that it // will get picked up. - // TODO: use a lock-free container - commands: Mutex>, + commands: Sender, // This event is signalled after a new entry is added to `commands`, so that the `run()` // method can be notified. @@ -55,6 +55,8 @@ struct RunContext { // Handles corresponding to the `event` field of each element of `voices`. Must always be in // sync with `voices`, except that the first element is always `pending_scheduled_event`. handles: Vec, + + commands: Receiver, } enum Command { @@ -94,14 +96,17 @@ impl EventLoop { let pending_scheduled_event = unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) }; + let (tx, rx) = channel(); + EventLoop { pending_scheduled_event: pending_scheduled_event, run_context: Mutex::new(RunContext { streams: Vec::new(), handles: vec![pending_scheduled_event], + commands: rx, }), next_stream_id: AtomicUsize::new(0), - commands: Mutex::new(Vec::new()), + commands: tx, } } @@ -245,10 +250,7 @@ impl EventLoop { sample_format: format.data_type, }; - self.commands.lock().unwrap().push(Command::NewStream(inner)); - - let result = synchapi::SetEvent(self.pending_scheduled_event); - assert!(result != 0); + self.push_command(Command::NewStream(inner)); }; Ok(new_stream_id) @@ -393,10 +395,7 @@ impl EventLoop { sample_format: format.data_type, }; - self.commands.lock().unwrap().push(Command::NewStream(inner)); - - let result = synchapi::SetEvent(self.pending_scheduled_event); - assert!(result != 0); + self.push_command(Command::NewStream(inner)); }; Ok(new_stream_id) @@ -405,14 +404,7 @@ impl EventLoop { #[inline] pub fn destroy_stream(&self, stream_id: StreamId) { - unsafe { - self.commands - .lock() - .unwrap() - .push(Command::DestroyStream(stream_id)); - let result = synchapi::SetEvent(self.pending_scheduled_event); - assert!(result != 0); - } + self.push_command(Command::DestroyStream(stream_id)); } #[inline] @@ -427,11 +419,13 @@ impl EventLoop { // We keep `run_context` locked forever, which guarantees that two invocations of // `run()` cannot run simultaneously. let mut run_context = self.run_context.lock().unwrap(); + // Force a deref so that borrow checker can operate on each field independently. + // Shadow the name because we don't use (or drop) it otherwise. + let run_context = &mut *run_context; loop { // Process the pending commands. - let mut commands_lock = self.commands.lock().unwrap(); - for command in commands_lock.drain(..) { + for command in run_context.commands.try_iter() { match command { Command::NewStream(stream_inner) => { let event = stream_inner.event; @@ -473,7 +467,6 @@ impl EventLoop { }, } } - drop(commands_lock); // Wait for any of the handles to be signalled, which means that the corresponding // sound needs a buffer. @@ -618,17 +611,19 @@ impl EventLoop { #[inline] pub fn play_stream(&self, stream: StreamId) { - unsafe { - self.commands.lock().unwrap().push(Command::PlayStream(stream)); - let result = synchapi::SetEvent(self.pending_scheduled_event); - assert!(result != 0); - } + self.push_command(Command::PlayStream(stream)); } #[inline] pub fn pause_stream(&self, stream: StreamId) { + self.push_command(Command::PauseStream(stream)); + } + + #[inline] + fn push_command(&self, command: Command) { + // Safe to unwrap: sender outlives receiver. + self.commands.send(command).unwrap(); unsafe { - self.commands.lock().unwrap().push(Command::PauseStream(stream)); let result = synchapi::SetEvent(self.pending_scheduled_event); assert!(result != 0); }