diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index ace96c69a1d1..094c787fcf0e 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -367,16 +367,9 @@ impl SchedulerImpl { let active_cores = scheduler_cores.min(number_of_canisters); for (i, canister_id) in scheduling_order.take(active_cores).enumerate() { let canister_state = canister_states.get_mut(canister_id).unwrap(); - // As top `scheduler_cores` canisters are guaranteed to be scheduled - // this round, their accumulated priorities must be decreased here - // by `capacity * multiplier / scheduler_cores`. But instead this - // value is accumulated in the `priority_credit`, and applied later: - // * For short executions, the `priority_credit` is deducted from - // the `accumulated_priority` at the end of the round. - // * For long executions, the `priority_credit` is accumulated - // for a few rounds, and deducted from the `accumulated_priority` - // at the end of the long execution. - canister_state.scheduler_state.priority_credit += + // Decrease accumulated priorities of the top `scheduler_cores` canisters. + // This is required to respect scheduler invariant after the round is finished. + canister_state.scheduler_state.accumulated_priority -= (compute_capacity_percent * multiplier / active_cores as i64).into(); if i < round_schedule.long_execution_cores { canister_state.scheduler_state.long_execution_mode = @@ -626,9 +619,9 @@ impl SchedulerImpl { #[allow(clippy::too_many_arguments, clippy::type_complexity)] fn inner_round<'a>( &'a self, + round_log: &ReplicaLogger, mut state: ReplicatedState, csprng: &mut Csprng, - round_schedule: &RoundSchedule, current_round: ExecutionRound, root_measurement_scope: &MeasurementScope<'a>, scheduler_round_limits: &mut SchedulerRoundLimits, @@ -700,19 +693,32 @@ impl SchedulerImpl { // Update subnet available memory before taking out the canisters. round_limits.subnet_available_memory = self.exec_env.subnet_available_memory(&state); - let canisters = state.take_canister_states(); - // Obtain the active canisters and update the collection of heap delta rate-limited canisters. - let (active_round_schedule, rate_limited_canister_ids) = round_schedule - .filter_canisters( - &canisters, - self.config.heap_delta_rate_limit, - self.rate_limiting_of_heap_delta, + let mut canisters = state.take_canister_states(); + + // Scheduling. + // TODO(EXC-1617): The scheduling will be optimized in the follow up PRs. + let (mut active_canisters_partitioned_by_cores, inactive_canisters) = { + let _timer = self.metrics.round_scheduling_duration.start_timer(); + let round_schedule = self.apply_scheduling_strategy( + round_log, + self.config.scheduler_cores, + current_round, + self.config.accumulated_priority_reset_interval, + &mut canisters, ); - round_filtered_canisters - .add_canisters(&active_round_schedule, &rate_limited_canister_ids); + // Obtain the active canisters and update the collection + // of heap delta rate-limited canisters. + let (active_round_schedule, rate_limited_canister_ids) = round_schedule + .filter_canisters( + &canisters, + self.config.heap_delta_rate_limit, + self.rate_limiting_of_heap_delta, + ); + round_filtered_canisters + .add_canisters(&active_round_schedule, &rate_limited_canister_ids); - let (mut active_canisters_partitioned_by_cores, inactive_canisters) = - active_round_schedule.partition_canisters_to_cores(canisters); + active_round_schedule.partition_canisters_to_cores(canisters) + }; if is_first_iteration { for partition in active_canisters_partitioned_by_cores.iter_mut() { @@ -1690,27 +1696,11 @@ impl Scheduler for SchedulerImpl { scheduler_round_limits.update_subnet_round_limits(&subnet_round_limits); }; - // Scheduling. - let round_schedule = { - let _timer = self.metrics.round_scheduling_duration.start_timer(); - - let mut canisters = state.take_canister_states(); - let round_schedule_candidate = self.apply_scheduling_strategy( - &round_log, - self.config.scheduler_cores, - current_round, - self.config.accumulated_priority_reset_interval, - &mut canisters, - ); - state.put_canister_states(canisters); - round_schedule_candidate - }; - // Inner round. let (mut state, active_canister_ids) = self.inner_round( + &round_log, state, &mut csprng, - &round_schedule, current_round, &root_measurement_scope, &mut scheduler_round_limits, diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index eb9ddf721da7..6d217033a3c5 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -65,7 +65,7 @@ fn can_fully_execute_canisters_with_one_input_message_each() { let mut test = SchedulerTestBuilder::new() .with_scheduler_config(SchedulerConfig { scheduler_cores: 2, - max_instructions_per_round: NumInstructions::from(1 << 30), + max_instructions_per_round: NumInstructions::from(10), max_instructions_per_message: NumInstructions::from(5), max_instructions_per_message_without_dts: NumInstructions::from(5), max_instructions_per_slice: NumInstructions::from(5), @@ -1699,21 +1699,15 @@ fn can_fully_execute_multiple_canisters_with_multiple_messages_each() { for canister_state in test.state().canisters_iter() { let system_state = &canister_state.system_state; assert_eq!(system_state.queues().ingress_queue_size(), 0); - assert_eq!( - canister_state.scheduler_state.last_full_execution_round, - ExecutionRound::new(1) - ); - assert_eq!( - system_state - .canister_metrics - .skipped_round_due_to_no_messages, - 0 - ); - assert_eq!(system_state.canister_metrics.executed, 1); - assert_eq!( - system_state.canister_metrics.interrupted_during_execution, - 0 - ); + + let scheduler_state = &canister_state.scheduler_state; + assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + + let metrics = &system_state.canister_metrics; + // The inner round was skipped once before breaking the round. + assert_eq!(metrics.skipped_round_due_to_no_messages, 1); + assert_eq!(metrics.executed, 1); + assert_eq!(metrics.interrupted_during_execution, 0); } assert_eq!( @@ -4781,28 +4775,31 @@ fn break_after_long_executions(#[strategy(2..10_usize)] scheduler_cores: usize) } /// Scenario: -/// 1. One canister with two long messages `slice + 1` instructions each. +/// 1. Scheduler cores + 1 canisters with two long executions each. /// /// Expectations: /// 1. After the first round the canister should have a paused long execution. -/// 2. After the second round the canister should have no executions, i.e. the +/// 2. After the second round the canister should have no executions, i.e. /// finish the paused execution and should not start any new executions. #[test] fn filter_after_long_executions() { + let scheduler_cores = 2; let max_instructions_per_slice = SchedulerConfig::application_subnet() .max_instructions_per_slice .get(); let mut test = SchedulerTestBuilder::new() .with_scheduler_config(SchedulerConfig { + scheduler_cores, max_instructions_per_round: (max_instructions_per_slice * 2).into(), max_instructions_per_message: (max_instructions_per_slice * 2).into(), max_instructions_per_message_without_dts: max_instructions_per_slice.into(), + max_instructions_per_slice: max_instructions_per_slice.into(), ..SchedulerConfig::application_subnet() }) .build(); - // Create a canister with long messages + // Create scheduler cores + 1 canisters with long executions. let mut long_message_ids = vec![]; let long_canister_id = test.create_canister(); for _ in 0..2 { @@ -4810,21 +4807,32 @@ fn filter_after_long_executions() { test.send_ingress(long_canister_id, ingress(max_instructions_per_slice + 1)); long_message_ids.push(long_message_id); } + for _ in 0..scheduler_cores { + let another_canister_id = test.create_canister(); + for _ in 0..2 { + test.send_ingress(another_canister_id, ingress(max_instructions_per_slice + 1)); + } + } + + // Remember the initial accumulated priority. + let canister = test.state().canister_state(&long_canister_id).unwrap(); + let ap_before_execution = canister.scheduler_state.accumulated_priority; // After the first round the canister should have a paused long execution. test.execute_round(ExecutionRoundType::OrdinaryRound); - for canister in test.state().canisters_iter() { - assert_eq!(canister.system_state.canister_metrics.executed, 1); - assert!(canister.has_paused_execution()); - } + let canister = test.state().canister_state(&long_canister_id).unwrap(); + assert_eq!(canister.system_state.canister_metrics.executed, 1); + assert!(canister.has_paused_execution()); - // After the second round the canister should have no executions, i.e. the - // finish the paused execution and should not start any new executions. + // After the second round the canister should finish the paused execution. test.execute_round(ExecutionRoundType::OrdinaryRound); - for canister in test.state().canisters_iter() { - assert_eq!(canister.system_state.canister_metrics.executed, 2); - assert!(!canister.has_paused_execution()); - } + let canister = test.state().canister_state(&long_canister_id).unwrap(); + assert_eq!(canister.system_state.canister_metrics.executed, 2); + assert!(!canister.has_paused_execution()); + + // The accumulated priority should not increase after the long execution is finished. + let ap_after_execution = canister.scheduler_state.accumulated_priority; + assert!(ap_after_execution <= ap_before_execution); } #[test] @@ -5815,3 +5823,86 @@ fn scheduled_heap_delta_limit_scaling() { assert_eq!(10, scheduled_limit(50, 50, 9, 10, 5)); assert_eq!(10, scheduled_limit(55, 50, 9, 10, 5)); } + +#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })] +fn apply_scheduling_strategy_in_inner_round(#[strategy(2..10_usize)] scheduler_cores: usize) { + fn run_inner_rounds_on_cores(inner_rounds: u64, scheduler_cores: usize) -> SchedulerTest { + let instructions = 20; + let canisters_per_core = 2; + let max_messages_per_round = canisters_per_core * inner_rounds; + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + max_instructions_per_round: (instructions * max_messages_per_round).into(), + max_instructions_per_message: instructions.into(), + max_instructions_per_message_without_dts: instructions.into(), + max_instructions_per_slice: instructions.into(), + instruction_overhead_per_execution: NumInstructions::from(0), + instruction_overhead_per_canister: NumInstructions::from(0), + instruction_overhead_per_canister_for_finalization: NumInstructions::from(0), + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + // Create 2 canisters for each scheduler core. + let num_canisters = scheduler_cores as u64 * canisters_per_core; + let mut canister_ids = vec![]; + for _ in 0..num_canisters { + canister_ids.push(test.create_canister()); + } + + // Send a self call message to each canister. Having 2 canisters + // on each scheduler core and `2 * inner_rounds`` messages per round, + // all canisters should be executed in each inner round. + for canister_id in canister_ids.iter() { + let message = ingress(instructions).call( + other_side(*canister_id, instructions - 1), + on_response(instructions - 2), + ); + test.send_ingress(*canister_id, message); + } + + test.execute_round(ExecutionRoundType::OrdinaryRound); + test + } + + let test = run_inner_rounds_on_cores(1, scheduler_cores); + let mut total_accumulated_priority = 0; + for (i, canister) in test.state().canisters_iter().enumerate() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // After just one inner round, the first `scheduler_cores` canisters + // should be charged for execution. + if i < scheduler_cores { + prop_assert!(scheduler_state.accumulated_priority < 0.into()); + } else { + prop_assert!(scheduler_state.accumulated_priority > 0.into()); + } + // All ingresses should be executed in the previous round. + prop_assert_eq!(system_state.queues().ingress_queue_size(), 0); + prop_assert_eq!(system_state.canister_metrics.executed, 1); + prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + } + // The accumulated priority invariant should be respected. + prop_assert_eq!(total_accumulated_priority, 0); + + let test = run_inner_rounds_on_cores(2, scheduler_cores); + let mut total_accumulated_priority = 0; + for canister in test.state().canisters_iter() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // After two inner rounds, all canisters should be charged for execution. + prop_assert!(scheduler_state.accumulated_priority == 0.into()); + // All ingresses should be executed twice in the previous round. + prop_assert_eq!(system_state.queues().ingress_queue_size(), 0); + prop_assert_eq!(system_state.canister_metrics.executed, 2); + prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + } + // The accumulated priority invariant should be respected. + prop_assert_eq!(total_accumulated_priority, 0); +}