Merge pull request #283 from ishitatsuyuki/no-mutex

Use channels instead of mutex
This commit is contained in:
mitchmindtree 2019-06-13 13:03:38 +02:00 committed by GitHub
commit e6f61b3a82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 79 deletions

View File

@ -17,6 +17,7 @@ use UnknownTypeOutputBuffer;
use std::{cmp, ffi, mem, ptr}; use std::{cmp, ffi, mem, ptr};
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::vec::IntoIter as VecIntoIter; use std::vec::IntoIter as VecIntoIter;
@ -325,8 +326,7 @@ pub struct EventLoop {
run_context: Mutex<RunContext>, run_context: Mutex<RunContext>,
// Commands processed by the `run()` method that is currently running. // Commands processed by the `run()` method that is currently running.
// TODO: use a lock-free container commands: Sender<Command>,
commands: Mutex<Vec<Command>>,
} }
unsafe impl Send for EventLoop { unsafe impl Send for EventLoop {
@ -347,6 +347,8 @@ struct RunContext {
descriptors: Vec<libc::pollfd>, descriptors: Vec<libc::pollfd>,
// List of streams that are written in `descriptors`. // List of streams that are written in `descriptors`.
streams: Vec<StreamInner>, streams: Vec<StreamInner>,
commands: Receiver<Command>,
} }
struct StreamInner { struct StreamInner {
@ -404,16 +406,19 @@ impl EventLoop {
}, },
]; ];
let (tx, rx) = channel();
let run_context = Mutex::new(RunContext { let run_context = Mutex::new(RunContext {
descriptors: initial_descriptors, descriptors: initial_descriptors,
streams: Vec::new(), streams: Vec::new(),
commands: rx,
}); });
EventLoop { EventLoop {
next_stream_id: AtomicUsize::new(0), next_stream_id: AtomicUsize::new(0),
pending_trigger: pending_trigger, pending_trigger: pending_trigger,
run_context, run_context,
commands: Mutex::new(Vec::new()), commands: tx,
} }
} }
@ -431,56 +436,53 @@ impl EventLoop {
loop { loop {
{ {
let mut commands_lock = self.commands.lock().unwrap(); for command in run_context.commands.try_iter() {
if !commands_lock.is_empty() { match command {
for command in commands_lock.drain(..) { Command::DestroyStream(stream_id) => {
match command { run_context.streams.retain(|s| s.id != stream_id);
Command::DestroyStream(stream_id) => { },
run_context.streams.retain(|s| s.id != stream_id); Command::PlayStream(stream_id) => {
}, if let Some(stream) = run_context.streams.iter_mut()
Command::PlayStream(stream_id) => { .find(|stream| stream.can_pause && stream.id == stream_id)
if let Some(stream) = run_context.streams.iter_mut() {
.find(|stream| stream.can_pause && stream.id == stream_id) alsa::snd_pcm_pause(stream.channel, 0);
{ stream.is_paused = false;
alsa::snd_pcm_pause(stream.channel, 0); }
stream.is_paused = false; },
} Command::PauseStream(stream_id) => {
}, if let Some(stream) = run_context.streams.iter_mut()
Command::PauseStream(stream_id) => { .find(|stream| stream.can_pause && stream.id == stream_id)
if let Some(stream) = run_context.streams.iter_mut() {
.find(|stream| stream.can_pause && stream.id == stream_id) alsa::snd_pcm_pause(stream.channel, 1);
{ stream.is_paused = true;
alsa::snd_pcm_pause(stream.channel, 1); }
stream.is_paused = true; },
} Command::NewStream(stream_inner) => {
}, run_context.streams.push(stream_inner);
Command::NewStream(stream_inner) => {
run_context.streams.push(stream_inner);
},
}
}
run_context.descriptors = vec![
libc::pollfd {
fd: self.pending_trigger.read_fd(),
events: libc::POLLIN,
revents: 0,
}, },
];
for stream in run_context.streams.iter() {
run_context.descriptors.reserve(stream.num_descriptors);
let len = run_context.descriptors.len();
let filled = alsa::snd_pcm_poll_descriptors(stream.channel,
run_context
.descriptors
.as_mut_ptr()
.offset(len as isize),
stream.num_descriptors as
libc::c_uint);
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
run_context.descriptors.set_len(len + stream.num_descriptors);
} }
} }
run_context.descriptors = vec![
libc::pollfd {
fd: self.pending_trigger.read_fd(),
events: libc::POLLIN,
revents: 0,
},
];
for stream in run_context.streams.iter() {
run_context.descriptors.reserve(stream.num_descriptors);
let len = run_context.descriptors.len();
let filled = alsa::snd_pcm_poll_descriptors(stream.channel,
run_context
.descriptors
.as_mut_ptr()
.offset(len as isize),
stream.num_descriptors as
libc::c_uint);
debug_assert_eq!(filled, stream.num_descriptors as libc::c_int);
run_context.descriptors.set_len(len + stream.num_descriptors);
}
} }
let ret = libc::poll(run_context.descriptors.as_mut_ptr(), let ret = libc::poll(run_context.descriptors.as_mut_ptr(),
@ -765,7 +767,8 @@ impl EventLoop {
#[inline] #[inline]
fn push_command(&self, command: Command) { fn push_command(&self, command: Command) {
self.commands.lock().unwrap().push(command); // Safe to unwrap: sender outlives receiver.
self.commands.send(command).unwrap();
self.pending_trigger.wakeup(); self.pending_trigger.wakeup();
} }

View File

@ -16,6 +16,7 @@ use std::mem;
use std::ptr; use std::ptr;
use std::slice; use std::slice;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -40,8 +41,7 @@ pub struct EventLoop {
// Commands processed by the `run()` method that is currently running. // Commands processed by the `run()` method that is currently running.
// `pending_scheduled_event` must be signalled whenever a command is added here, so that it // `pending_scheduled_event` must be signalled whenever a command is added here, so that it
// will get picked up. // will get picked up.
// TODO: use a lock-free container commands: Sender<Command>,
commands: Mutex<Vec<Command>>,
// This event is signalled after a new entry is added to `commands`, so that the `run()` // This event is signalled after a new entry is added to `commands`, so that the `run()`
// method can be notified. // method can be notified.
@ -55,6 +55,8 @@ struct RunContext {
// Handles corresponding to the `event` field of each element of `voices`. Must always be in // Handles corresponding to the `event` field of each element of `voices`. Must always be in
// sync with `voices`, except that the first element is always `pending_scheduled_event`. // sync with `voices`, except that the first element is always `pending_scheduled_event`.
handles: Vec<winnt::HANDLE>, handles: Vec<winnt::HANDLE>,
commands: Receiver<Command>,
} }
enum Command { enum Command {
@ -94,14 +96,17 @@ impl EventLoop {
let pending_scheduled_event = let pending_scheduled_event =
unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) }; unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) };
let (tx, rx) = channel();
EventLoop { EventLoop {
pending_scheduled_event: pending_scheduled_event, pending_scheduled_event: pending_scheduled_event,
run_context: Mutex::new(RunContext { run_context: Mutex::new(RunContext {
streams: Vec::new(), streams: Vec::new(),
handles: vec![pending_scheduled_event], handles: vec![pending_scheduled_event],
commands: rx,
}), }),
next_stream_id: AtomicUsize::new(0), next_stream_id: AtomicUsize::new(0),
commands: Mutex::new(Vec::new()), commands: tx,
} }
} }
@ -245,10 +250,7 @@ impl EventLoop {
sample_format: format.data_type, sample_format: format.data_type,
}; };
self.commands.lock().unwrap().push(Command::NewStream(inner)); self.push_command(Command::NewStream(inner));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
}; };
Ok(new_stream_id) Ok(new_stream_id)
@ -393,10 +395,7 @@ impl EventLoop {
sample_format: format.data_type, sample_format: format.data_type,
}; };
self.commands.lock().unwrap().push(Command::NewStream(inner)); self.push_command(Command::NewStream(inner));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
}; };
Ok(new_stream_id) Ok(new_stream_id)
@ -405,14 +404,7 @@ impl EventLoop {
#[inline] #[inline]
pub fn destroy_stream(&self, stream_id: StreamId) { pub fn destroy_stream(&self, stream_id: StreamId) {
unsafe { self.push_command(Command::DestroyStream(stream_id));
self.commands
.lock()
.unwrap()
.push(Command::DestroyStream(stream_id));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
}
} }
#[inline] #[inline]
@ -427,11 +419,13 @@ impl EventLoop {
// We keep `run_context` locked forever, which guarantees that two invocations of // We keep `run_context` locked forever, which guarantees that two invocations of
// `run()` cannot run simultaneously. // `run()` cannot run simultaneously.
let mut run_context = self.run_context.lock().unwrap(); let mut run_context = self.run_context.lock().unwrap();
// Force a deref so that borrow checker can operate on each field independently.
// Shadow the name because we don't use (or drop) it otherwise.
let run_context = &mut *run_context;
loop { loop {
// Process the pending commands. // Process the pending commands.
let mut commands_lock = self.commands.lock().unwrap(); for command in run_context.commands.try_iter() {
for command in commands_lock.drain(..) {
match command { match command {
Command::NewStream(stream_inner) => { Command::NewStream(stream_inner) => {
let event = stream_inner.event; let event = stream_inner.event;
@ -473,7 +467,6 @@ impl EventLoop {
}, },
} }
} }
drop(commands_lock);
// Wait for any of the handles to be signalled, which means that the corresponding // Wait for any of the handles to be signalled, which means that the corresponding
// sound needs a buffer. // sound needs a buffer.
@ -618,17 +611,19 @@ impl EventLoop {
#[inline] #[inline]
pub fn play_stream(&self, stream: StreamId) { pub fn play_stream(&self, stream: StreamId) {
unsafe { self.push_command(Command::PlayStream(stream));
self.commands.lock().unwrap().push(Command::PlayStream(stream));
let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0);
}
} }
#[inline] #[inline]
pub fn pause_stream(&self, stream: StreamId) { pub fn pause_stream(&self, stream: StreamId) {
self.push_command(Command::PauseStream(stream));
}
#[inline]
fn push_command(&self, command: Command) {
// Safe to unwrap: sender outlives receiver.
self.commands.send(command).unwrap();
unsafe { unsafe {
self.commands.lock().unwrap().push(Command::PauseStream(stream));
let result = synchapi::SetEvent(self.pending_scheduled_event); let result = synchapi::SetEvent(self.pending_scheduled_event);
assert!(result != 0); assert!(result != 0);
} }