diff --git a/av1an-core/src/broker.rs b/av1an-core/src/broker.rs index 7fc2eeca..cc916f94 100644 --- a/av1an-core/src/broker.rs +++ b/av1an-core/src/broker.rs @@ -96,7 +96,7 @@ impl Display for EncoderCrash { impl<'a> Broker<'a> { /// Main encoding loop. set_thread_affinity may be ignored if the value is invalid. - pub fn encoding_loop(self, tx: Sender<()>, mut set_thread_affinity: Option) { + pub fn encoding_loop(self, tx: Sender<()>, set_thread_affinity: Option) { assert!(!self.chunk_queue.is_empty()); if !self.chunk_queue.is_empty() { @@ -107,26 +107,6 @@ impl<'a> Broker<'a> { } drop(sender); - cfg_if! { - if #[cfg(any(target_os = "linux", target_os = "windows"))] { - if let Some(threads) = set_thread_affinity { - let available_threads = available_parallelism().expect("Unrecoverable: Failed to get thread count").get(); - let requested_threads = threads.saturating_mul(self.project.args.workers); - if requested_threads > available_threads { - warn!( - "ignoring set_thread_affinity: requested more threads than available ({}/{})", - requested_threads, available_threads - ); - set_thread_affinity = None; - } else if requested_threads == 0 { - warn!("ignoring set_thread_affinity: requested 0 threads"); - - set_thread_affinity = None; - } - } - } - } - crossbeam_utils::thread::scope(|s| { let consumers: Vec<_> = (0..self.project.args.workers) .map(|idx| (receiver.clone(), &self, idx)) @@ -136,13 +116,26 @@ impl<'a> Broker<'a> { cfg_if! { if #[cfg(any(target_os = "linux", target_os = "windows"))] { if let Some(threads) = set_thread_affinity { - let mut cpu_set = SmallVec::<[usize; 16]>::new(); - cpu_set.extend((threads * worker_id..).take(threads)); - if let Err(e) = affinity::set_thread_affinity(&cpu_set) { - warn!( - "failed to set thread affinity for worker {}: {}", - worker_id, e - ); + if threads == 0 { + warn!("Ignoring set_thread_affinity: Requested 0 threads"); + } else { + match available_parallelism() { + Ok(parallelism) => { + let available_threads = parallelism.get(); + let mut cpu_set = SmallVec::<[usize; 16]>::new(); + let start_thread = (threads * worker_id) % available_threads; + cpu_set.extend((start_thread..start_thread + threads).map(|t| t % available_threads)); + if let Err(e) = affinity::set_thread_affinity(&cpu_set) { + warn!( + "Failed to set thread affinity for worker {}: {}", + worker_id, e + ); + } + }, + Err(e) => { + warn!("Failed to get thread count: {}. Thread affinity will not be set", e); + } + } } } }