Skip to content

Commit

Permalink
feat: EXC-1735: Move scheduling strategy into the inner round
Browse files Browse the repository at this point in the history
By moving the scheduling strategy into the inner round, we can adjust canister priorities within each round. This allows for greater flexibility and responsiveness to change canister priorities.
  • Loading branch information
berestovskyy committed Sep 30, 2024
1 parent c39a8b3 commit f0ea8a7
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 68 deletions.
68 changes: 29 additions & 39 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,16 +367,9 @@ impl SchedulerImpl {
let active_cores = scheduler_cores.min(number_of_canisters);
for (i, canister_id) in scheduling_order.take(active_cores).enumerate() {
let canister_state = canister_states.get_mut(canister_id).unwrap();
// As top `scheduler_cores` canisters are guaranteed to be scheduled
// this round, their accumulated priorities must be decreased here
// by `capacity * multiplier / scheduler_cores`. But instead this
// value is accumulated in the `priority_credit`, and applied later:
// * For short executions, the `priority_credit` is deducted from
// the `accumulated_priority` at the end of the round.
// * For long executions, the `priority_credit` is accumulated
// for a few rounds, and deducted from the `accumulated_priority`
// at the end of the long execution.
canister_state.scheduler_state.priority_credit +=
// Decrease accumulated priorities of the top `scheduler_cores` canisters.
// This is required to respect scheduler invariant after the round is finished.
canister_state.scheduler_state.accumulated_priority -=
(compute_capacity_percent * multiplier / active_cores as i64).into();
if i < round_schedule.long_execution_cores {
canister_state.scheduler_state.long_execution_mode =
Expand Down Expand Up @@ -626,9 +619,9 @@ impl SchedulerImpl {
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
fn inner_round<'a>(
&'a self,
round_log: &ReplicaLogger,
mut state: ReplicatedState,
csprng: &mut Csprng,
round_schedule: &RoundSchedule,
current_round: ExecutionRound,
root_measurement_scope: &MeasurementScope<'a>,
scheduler_round_limits: &mut SchedulerRoundLimits,
Expand Down Expand Up @@ -700,19 +693,32 @@ impl SchedulerImpl {

// Update subnet available memory before taking out the canisters.
round_limits.subnet_available_memory = self.exec_env.subnet_available_memory(&state);
let canisters = state.take_canister_states();
// Obtain the active canisters and update the collection of heap delta rate-limited canisters.
let (active_round_schedule, rate_limited_canister_ids) = round_schedule
.filter_canisters(
&canisters,
self.config.heap_delta_rate_limit,
self.rate_limiting_of_heap_delta,
let mut canisters = state.take_canister_states();

// Scheduling.
// TODO(EXC-1617): The scheduling will be optimized in the follow up PRs.
let (mut active_canisters_partitioned_by_cores, inactive_canisters) = {
let _timer = self.metrics.round_scheduling_duration.start_timer();
let round_schedule = self.apply_scheduling_strategy(
round_log,
self.config.scheduler_cores,
current_round,
self.config.accumulated_priority_reset_interval,
&mut canisters,
);
round_filtered_canisters
.add_canisters(&active_round_schedule, &rate_limited_canister_ids);
// Obtain the active canisters and update the collection
// of heap delta rate-limited canisters.
let (active_round_schedule, rate_limited_canister_ids) = round_schedule
.filter_canisters(
&canisters,
self.config.heap_delta_rate_limit,
self.rate_limiting_of_heap_delta,
);
round_filtered_canisters
.add_canisters(&active_round_schedule, &rate_limited_canister_ids);

let (mut active_canisters_partitioned_by_cores, inactive_canisters) =
active_round_schedule.partition_canisters_to_cores(canisters);
active_round_schedule.partition_canisters_to_cores(canisters)
};

if is_first_iteration {
for partition in active_canisters_partitioned_by_cores.iter_mut() {
Expand Down Expand Up @@ -1690,27 +1696,11 @@ impl Scheduler for SchedulerImpl {
scheduler_round_limits.update_subnet_round_limits(&subnet_round_limits);
};

// Scheduling.
let round_schedule = {
let _timer = self.metrics.round_scheduling_duration.start_timer();

let mut canisters = state.take_canister_states();
let round_schedule_candidate = self.apply_scheduling_strategy(
&round_log,
self.config.scheduler_cores,
current_round,
self.config.accumulated_priority_reset_interval,
&mut canisters,
);
state.put_canister_states(canisters);
round_schedule_candidate
};

// Inner round.
let (mut state, active_canister_ids) = self.inner_round(
&round_log,
state,
&mut csprng,
&round_schedule,
current_round,
&root_measurement_scope,
&mut scheduler_round_limits,
Expand Down
149 changes: 120 additions & 29 deletions rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn can_fully_execute_canisters_with_one_input_message_each() {
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores: 2,
max_instructions_per_round: NumInstructions::from(1 << 30),
max_instructions_per_round: NumInstructions::from(10),
max_instructions_per_message: NumInstructions::from(5),
max_instructions_per_message_without_dts: NumInstructions::from(5),
max_instructions_per_slice: NumInstructions::from(5),
Expand Down Expand Up @@ -1699,21 +1699,15 @@ fn can_fully_execute_multiple_canisters_with_multiple_messages_each() {
for canister_state in test.state().canisters_iter() {
let system_state = &canister_state.system_state;
assert_eq!(system_state.queues().ingress_queue_size(), 0);
assert_eq!(
canister_state.scheduler_state.last_full_execution_round,
ExecutionRound::new(1)
);
assert_eq!(
system_state
.canister_metrics
.skipped_round_due_to_no_messages,
0
);
assert_eq!(system_state.canister_metrics.executed, 1);
assert_eq!(
system_state.canister_metrics.interrupted_during_execution,
0
);

let scheduler_state = &canister_state.scheduler_state;
assert_eq!(scheduler_state.last_full_execution_round, test.last_round());

let metrics = &system_state.canister_metrics;
// The inner round was skipped once before breaking the round.
assert_eq!(metrics.skipped_round_due_to_no_messages, 1);
assert_eq!(metrics.executed, 1);
assert_eq!(metrics.interrupted_during_execution, 0);
}

assert_eq!(
Expand Down Expand Up @@ -4781,50 +4775,64 @@ fn break_after_long_executions(#[strategy(2..10_usize)] scheduler_cores: usize)
}

/// Scenario:
/// 1. One canister with two long messages `slice + 1` instructions each.
/// 1. Scheduler cores + 1 canisters with two long executions each.
///
/// Expectations:
/// 1. After the first round the canister should have a paused long execution.
/// 2. After the second round the canister should have no executions, i.e. the
/// 2. After the second round the canister should have no executions, i.e.
/// finish the paused execution and should not start any new executions.
#[test]
fn filter_after_long_executions() {
let scheduler_cores = 2;
let max_instructions_per_slice = SchedulerConfig::application_subnet()
.max_instructions_per_slice
.get();

let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (max_instructions_per_slice * 2).into(),
max_instructions_per_message: (max_instructions_per_slice * 2).into(),
max_instructions_per_message_without_dts: max_instructions_per_slice.into(),
max_instructions_per_slice: max_instructions_per_slice.into(),
..SchedulerConfig::application_subnet()
})
.build();

// Create a canister with long messages
// Create scheduler cores + 1 canisters with long executions.
let mut long_message_ids = vec![];
let long_canister_id = test.create_canister();
for _ in 0..2 {
let long_message_id =
test.send_ingress(long_canister_id, ingress(max_instructions_per_slice + 1));
long_message_ids.push(long_message_id);
}
for _ in 0..scheduler_cores {
let another_canister_id = test.create_canister();
for _ in 0..2 {
test.send_ingress(another_canister_id, ingress(max_instructions_per_slice + 1));
}
}

// Remember the initial accumulated priority.
let canister = test.state().canister_state(&long_canister_id).unwrap();
let ap_before_execution = canister.scheduler_state.accumulated_priority;

// After the first round the canister should have a paused long execution.
test.execute_round(ExecutionRoundType::OrdinaryRound);
for canister in test.state().canisters_iter() {
assert_eq!(canister.system_state.canister_metrics.executed, 1);
assert!(canister.has_paused_execution());
}
let canister = test.state().canister_state(&long_canister_id).unwrap();
assert_eq!(canister.system_state.canister_metrics.executed, 1);
assert!(canister.has_paused_execution());

// After the second round the canister should have no executions, i.e. the
// finish the paused execution and should not start any new executions.
// After the second round the canister should finish the paused execution.
test.execute_round(ExecutionRoundType::OrdinaryRound);
for canister in test.state().canisters_iter() {
assert_eq!(canister.system_state.canister_metrics.executed, 2);
assert!(!canister.has_paused_execution());
}
let canister = test.state().canister_state(&long_canister_id).unwrap();
assert_eq!(canister.system_state.canister_metrics.executed, 2);
assert!(!canister.has_paused_execution());

// The accumulated priority should not increase after the long execution is finished.
let ap_after_execution = canister.scheduler_state.accumulated_priority;
assert!(ap_after_execution <= ap_before_execution);
}

#[test]
Expand Down Expand Up @@ -5815,3 +5823,86 @@ fn scheduled_heap_delta_limit_scaling() {
assert_eq!(10, scheduled_limit(50, 50, 9, 10, 5));
assert_eq!(10, scheduled_limit(55, 50, 9, 10, 5));
}

#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })]
fn apply_scheduling_strategy_in_inner_round(#[strategy(2..10_usize)] scheduler_cores: usize) {
fn run_inner_rounds_on_cores(inner_rounds: u64, scheduler_cores: usize) -> SchedulerTest {
let instructions = 20;
let canisters_per_core = 2;
let max_messages_per_round = canisters_per_core * inner_rounds;
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (instructions * max_messages_per_round).into(),
max_instructions_per_message: instructions.into(),
max_instructions_per_message_without_dts: instructions.into(),
max_instructions_per_slice: instructions.into(),
instruction_overhead_per_execution: NumInstructions::from(0),
instruction_overhead_per_canister: NumInstructions::from(0),
instruction_overhead_per_canister_for_finalization: NumInstructions::from(0),
..SchedulerConfig::application_subnet()
})
.build();

// Bump up the round number.
test.execute_round(ExecutionRoundType::OrdinaryRound);

// Create 2 canisters for each scheduler core.
let num_canisters = scheduler_cores as u64 * canisters_per_core;
let mut canister_ids = vec![];
for _ in 0..num_canisters {
canister_ids.push(test.create_canister());
}

// Send a self call message to each canister. Having 2 canisters
// on each scheduler core and `2 * inner_rounds`` messages per round,
// all canisters should be executed in each inner round.
for canister_id in canister_ids.iter() {
let message = ingress(instructions).call(
other_side(*canister_id, instructions - 1),
on_response(instructions - 2),
);
test.send_ingress(*canister_id, message);
}

test.execute_round(ExecutionRoundType::OrdinaryRound);
test
}

let test = run_inner_rounds_on_cores(1, scheduler_cores);
let mut total_accumulated_priority = 0;
for (i, canister) in test.state().canisters_iter().enumerate() {
let system_state = &canister.system_state;
let scheduler_state = &canister.scheduler_state;
// After just one inner round, the first `scheduler_cores` canisters
// should be charged for execution.
if i < scheduler_cores {
prop_assert!(scheduler_state.accumulated_priority < 0.into());
} else {
prop_assert!(scheduler_state.accumulated_priority > 0.into());
}
// All ingresses should be executed in the previous round.
prop_assert_eq!(system_state.queues().ingress_queue_size(), 0);
prop_assert_eq!(system_state.canister_metrics.executed, 1);
prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round());
total_accumulated_priority += scheduler_state.accumulated_priority.get();
}
// The accumulated priority invariant should be respected.
prop_assert_eq!(total_accumulated_priority, 0);

let test = run_inner_rounds_on_cores(2, scheduler_cores);
let mut total_accumulated_priority = 0;
for canister in test.state().canisters_iter() {
let system_state = &canister.system_state;
let scheduler_state = &canister.scheduler_state;
// After two inner rounds, all canisters should be charged for execution.
prop_assert!(scheduler_state.accumulated_priority == 0.into());
// All ingresses should be executed twice in the previous round.
prop_assert_eq!(system_state.queues().ingress_queue_size(), 0);
prop_assert_eq!(system_state.canister_metrics.executed, 2);
prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round());
total_accumulated_priority += scheduler_state.accumulated_priority.get();
}
// The accumulated priority invariant should be respected.
prop_assert_eq!(total_accumulated_priority, 0);
}

0 comments on commit f0ea8a7

Please sign in to comment.