Use channels instead of mutex

This commit is contained in:
Tatsuyuki Ishi 2019-06-13 15:31:37 +09:00
parent 755eefedff
commit 1d3754fa7d
2 changed files with 77 additions and 79 deletions

View File

@ -17,6 +17,7 @@ use UnknownTypeOutputBuffer;
use std::{cmp, ffi, mem, ptr};
use std::sync::Mutex;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::vec::IntoIter as VecIntoIter;
@ -325,8 +326,7 @@ pub struct EventLoop {
run_context: Mutex<RunContext>,
// Commands processed by the `run()` method that is currently running.
// TODO: use a lock-free container
commands: Mutex<Vec<Command>>,
commands: Sender<Command>,
}
unsafe impl Send for EventLoop {
@ -347,6 +347,8 @@ struct RunContext {
descriptors: Vec<libc::pollfd>,
// List of streams that are written in `descriptors`.
streams: Vec<StreamInner>,
commands: Receiver<Command>,
}
struct StreamInner {
@ -404,16 +406,19 @@ impl EventLoop {
},
];
let (tx, rx) = channel();
let run_context = Mutex::new(RunContext {
descriptors: initial_descriptors,
streams: Vec::new(),
commands: rx,
});
EventLoop {
next_stream_id: AtomicUsize::new(0),
pending_trigger: pending_trigger,
run_context,
commands: Mutex::new(Vec::new()),
commands: tx,
}
}
@ -431,56 +436,53 @@ impl EventLoop {
loop {
{
let mut commands_lock = self.commands.lock().unwrap();
if !commands_lock.is_empty() {
for command in commands_lock.drain(..) {
match command {
Command::DestroyStream(stream_id) => {
run_context.streams.retain(|s| s.id != stream_id);
},
Command::PlayStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
alsa::snd_pcm_pause(stream.channel, 0);
stream.is_paused = false;
}
},
Command::PauseStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
alsa::snd_pcm_pause(stream.channel, 1);
stream.is_paused = true;
}
},
Command::NewStream(stream_inner) => {
run_context.streams.push(stream_inner);
},
}
}
run_context.descriptors = vec![
libc::pollfd {
fd: self.pending_trigger.read_fd(),
events: libc::POLLIN,
revents: 0,
for command in run_context.commands.try_iter() {
match command {
Command::DestroyStream(stream_id) => {
run_context.streams.retain(|s| s.id != stream_id);
},
Command::PlayStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
alsa::snd_pcm_pause(stream.channel, 0);
stream.is_paused = false;
}
},
Command::PauseStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
alsa::snd_pcm_pause(stream.channel, 1);
stream.is_paused = true;
}
},
Command::NewStream(stream_inner) => {
run_context.streams.push(stream_inner);
},
];
for stream in run_context.streams.iter() {
run_context.descriptors.reserve(stream.num_descriptors);
let len = run_context.descriptors.len();
let filled = alsa::snd_pcm_poll_descriptors(stream.channel,
run_context
.descriptors
.as_mut_ptr()
.offset(len as isize),
stream.num_descriptors as
libc::c_uint);
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
run_context.descriptors.set_len(len + stream.num_descriptors);
}
}
run_context.descriptors = vec![
libc::pollfd {
fd: self.pending_trigger.read_fd(),
events: libc::POLLIN,
revents: 0,
},
];
for stream in run_context.streams.iter() {
run_context.descriptors.reserve(stream.num_descriptors);
let len = run_context.descriptors.len();
let filled = alsa::snd_pcm_poll_descriptors(stream.channel,
run_context
.descriptors
.as_mut_ptr()
.offset(len as isize),
stream.num_descriptors as
libc::c_uint);
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
run_context.descriptors.set_len(len + stream.num_descriptors);
}
}
let ret = libc::poll(run_context.descriptors.as_mut_ptr(),
@ -765,7 +767,8 @@ impl EventLoop {
#[inline]
fn push_command(&self, command: Command) {
self.commands.lock().unwrap().push(command);
// Safe to unwrap: sender outlives receiver.
self.commands.send(command).unwrap();
self.pending_trigger.wakeup();
}

View File

@ -16,6 +16,7 @@ use std::mem;
use std::ptr;
use std::slice;
use std::sync::Mutex;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@ -40,8 +41,7 @@ pub struct EventLoop {
// Commands processed by the `run()` method that is currently running.
// `pending_scheduled_event` must be signalled whenever a command is added here, so that it
// will get picked up.
// TODO: use a lock-free container
commands: Mutex<Vec<Command>>,
commands: Sender<Command>,
// This event is signalled after a new entry is added to `commands`, so that the `run()`
// method can be notified.
@ -55,6 +55,8 @@ struct RunContext {
// Handles corresponding to the `event` field of each element of `voices`. Must always be in
// sync with `voices`, except that the first element is always `pending_scheduled_event`.
handles: Vec<winnt::HANDLE>,
commands: Receiver<Command>,
}
enum Command {
@ -94,14 +96,17 @@ impl EventLoop {
let pending_scheduled_event =
unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) };
let (tx, rx) = channel();
EventLoop {
pending_scheduled_event: pending_scheduled_event,
run_context: Mutex::new(RunContext {
streams: Vec::new(),
handles: vec![pending_scheduled_event],
commands: rx,
}),
next_stream_id: AtomicUsize::new(0),
commands: Mutex::new(Vec::new()),
commands: tx,
}
}
@ -245,10 +250,7 @@ impl EventLoop {
sample_format: format.data_type,
};
self.commands.lock().unwrap().push(Command::NewStream(inner));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
self.push_command(Command::NewStream(inner));
};
Ok(new_stream_id)
@ -393,10 +395,7 @@ impl EventLoop {
sample_format: format.data_type,
};
self.commands.lock().unwrap().push(Command::NewStream(inner));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
self.push_command(Command::NewStream(inner));
};
Ok(new_stream_id)
@ -405,14 +404,7 @@ impl EventLoop {
#[inline]
pub fn destroy_stream(&self, stream_id: StreamId) {
unsafe {
self.commands
.lock()
.unwrap()
.push(Command::DestroyStream(stream_id));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
}
self.push_command(Command::DestroyStream(stream_id));
}
#[inline]
@ -427,11 +419,13 @@ impl EventLoop {
// We keep `run_context` locked forever, which guarantees that two invocations of
// `run()` cannot run simultaneously.
let mut run_context = self.run_context.lock().unwrap();
// Force a deref so that borrow checker can operate on each field independently.
// Shadow the name because we don't use (or drop) it otherwise.
let run_context = &mut *run_context;
loop {
// Process the pending commands.
let mut commands_lock = self.commands.lock().unwrap();
for command in commands_lock.drain(..) {
for command in run_context.commands.try_iter() {
match command {
Command::NewStream(stream_inner) => {
let event = stream_inner.event;
@ -473,7 +467,6 @@ impl EventLoop {
},
}
}
drop(commands_lock);
// Wait for any of the handles to be signalled, which means that the corresponding
// sound needs a buffer.
@ -618,17 +611,19 @@ impl EventLoop {
#[inline]
pub fn play_stream(&self, stream: StreamId) {
unsafe {
self.commands.lock().unwrap().push(Command::PlayStream(stream));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
}
self.push_command(Command::PlayStream(stream));
}
#[inline]
pub fn pause_stream(&self, stream: StreamId) {
self.push_command(Command::PauseStream(stream));
}
#[inline]
fn push_command(&self, command: Command) {
// Safe to unwrap: sender outlives receiver.
self.commands.send(command).unwrap();
unsafe {
self.commands.lock().unwrap().push(Command::PauseStream(stream));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
}