Skip to content

Commit

Permalink
Allow more than 1 worker per thread pool set by set-thread-affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
damster101 authored and shssoichiro committed Sep 25, 2024
1 parent ae7c151 commit 46cd62c
Showing 1 changed file with 21 additions and 28 deletions.
49 changes: 21 additions & 28 deletions av1an-core/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Display for EncoderCrash {

impl<'a> Broker<'a> {
/// Main encoding loop. set_thread_affinity may be ignored if the value is invalid.
pub fn encoding_loop(self, tx: Sender<()>, mut set_thread_affinity: Option<usize>) {
pub fn encoding_loop(self, tx: Sender<()>, set_thread_affinity: Option<usize>) {
assert!(!self.chunk_queue.is_empty());

if !self.chunk_queue.is_empty() {
Expand All @@ -107,26 +107,6 @@ impl<'a> Broker<'a> {
}
drop(sender);

cfg_if! {
if #[cfg(any(target_os = "linux", target_os = "windows"))] {
if let Some(threads) = set_thread_affinity {
let available_threads = available_parallelism().expect("Unrecoverable: Failed to get thread count").get();
let requested_threads = threads.saturating_mul(self.project.args.workers);
if requested_threads > available_threads {
warn!(
"ignoring set_thread_affinity: requested more threads than available ({}/{})",
requested_threads, available_threads
);
set_thread_affinity = None;
} else if requested_threads == 0 {
warn!("ignoring set_thread_affinity: requested 0 threads");

set_thread_affinity = None;
}
}
}
}

crossbeam_utils::thread::scope(|s| {
let consumers: Vec<_> = (0..self.project.args.workers)
.map(|idx| (receiver.clone(), &self, idx))
Expand All @@ -136,13 +116,26 @@ impl<'a> Broker<'a> {
cfg_if! {
if #[cfg(any(target_os = "linux", target_os = "windows"))] {
if let Some(threads) = set_thread_affinity {
let mut cpu_set = SmallVec::<[usize; 16]>::new();
cpu_set.extend((threads * worker_id..).take(threads));
if let Err(e) = affinity::set_thread_affinity(&cpu_set) {
warn!(
"failed to set thread affinity for worker {}: {}",
worker_id, e
);
if threads == 0 {
warn!("Ignoring set_thread_affinity: Requested 0 threads");
} else {
match available_parallelism() {
Ok(parallelism) => {
let available_threads = parallelism.get();
let mut cpu_set = SmallVec::<[usize; 16]>::new();
let start_thread = (threads * worker_id) % available_threads;
cpu_set.extend((start_thread..start_thread + threads).map(|t| t % available_threads));
if let Err(e) = affinity::set_thread_affinity(&cpu_set) {
warn!(
"Failed to set thread affinity for worker {}: {}",
worker_id, e
);
}
},
Err(e) => {
warn!("Failed to get thread count: {}. Thread affinity will not be set", e);
}
}
}
}
}
Expand Down

0 comments on commit 46cd62c

Please sign in to comment.