Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: EXC-1735: Move scheduling into the inner round #1757

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 29 additions & 39 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,16 +367,9 @@ impl SchedulerImpl {
let active_cores = scheduler_cores.min(number_of_canisters);
for (i, canister_id) in scheduling_order.take(active_cores).enumerate() {
let canister_state = canister_states.get_mut(canister_id).unwrap();
// As top `scheduler_cores` canisters are guaranteed to be scheduled
// this round, their accumulated priorities must be decreased here
// by `capacity * multiplier / scheduler_cores`. But instead this
// value is accumulated in the `priority_credit`, and applied later:
// * For short executions, the `priority_credit` is deducted from
// the `accumulated_priority` at the end of the round.
// * For long executions, the `priority_credit` is accumulated
// for a few rounds, and deducted from the `accumulated_priority`
// at the end of the long execution.
canister_state.scheduler_state.priority_credit +=
// Decrease accumulated priorities of the top `scheduler_cores` canisters.
// This is required to respect scheduler invariant after the round is finished.
canister_state.scheduler_state.accumulated_priority -=
berestovskyy marked this conversation as resolved.
Show resolved Hide resolved
(compute_capacity_percent * multiplier / active_cores as i64).into();
if i < round_schedule.long_execution_cores {
canister_state.scheduler_state.long_execution_mode =
Expand Down Expand Up @@ -626,9 +619,9 @@ impl SchedulerImpl {
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
fn inner_round<'a>(
&'a self,
round_log: &ReplicaLogger,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal preference: My approach to argument ordering (and something that, accidentally or not, I see reflected in a lot of places) is:

  • at a high level, start with input arguments; then output arguments; and finally incidental stuff, such as logs and metrics;
  • within each of these groups (or more likely within the first) go by importance (e.g. first the state you are modifying, then the round, etc.; in this case, beyond the state it's all a bit subjective).

IOW, I would add the log at the very end (or at the very end, before the metrics).

mut state: ReplicatedState,
csprng: &mut Csprng,
round_schedule: &RoundSchedule,
current_round: ExecutionRound,
root_measurement_scope: &MeasurementScope<'a>,
scheduler_round_limits: &mut SchedulerRoundLimits,
Expand Down Expand Up @@ -700,19 +693,32 @@ impl SchedulerImpl {

// Update subnet available memory before taking out the canisters.
round_limits.subnet_available_memory = self.exec_env.subnet_available_memory(&state);
let canisters = state.take_canister_states();
// Obtain the active canisters and update the collection of heap delta rate-limited canisters.
let (active_round_schedule, rate_limited_canister_ids) = round_schedule
.filter_canisters(
&canisters,
self.config.heap_delta_rate_limit,
self.rate_limiting_of_heap_delta,
let mut canisters = state.take_canister_states();

// Scheduling.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High-level comment: Does this change help move us closer to the quick fix? (I.e. does it make it easier to charge all canisters that got a chance at a full round or not?)

Because OTOH, we have both subnets that spend 15-20 ms scheduling; and subnets that do 12 inner loop iterations per round. Luckily no subnet happens to be in both those groups, but even assuming no increase in these numbers, there's nothing stopping a subnet from doing 250 ms worth of scheduling out of a 400 ms round.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change shifts the schedule by four canisters per inner round, i.e. potentially 12 times faster. I'm aware of the potential performance impact but have prioritized other optimizations for the upcoming release...

// TODO(EXC-1617): The scheduling will be optimized in the follow up PRs.
let (mut active_canisters_partitioned_by_cores, inactive_canisters) = {
let _timer = self.metrics.round_scheduling_duration.start_timer();
berestovskyy marked this conversation as resolved.
Show resolved Hide resolved
let round_schedule = self.apply_scheduling_strategy(
round_log,
self.config.scheduler_cores,
current_round,
self.config.accumulated_priority_reset_interval,
&mut canisters,
);
round_filtered_canisters
.add_canisters(&active_round_schedule, &rate_limited_canister_ids);
// Obtain the active canisters and update the collection
// of heap delta rate-limited canisters.
let (active_round_schedule, rate_limited_canister_ids) = round_schedule
.filter_canisters(
&canisters,
self.config.heap_delta_rate_limit,
self.rate_limiting_of_heap_delta,
);
round_filtered_canisters
.add_canisters(&active_round_schedule, &rate_limited_canister_ids);

let (mut active_canisters_partitioned_by_cores, inactive_canisters) =
active_round_schedule.partition_canisters_to_cores(canisters);
active_round_schedule.partition_canisters_to_cores(canisters)
};

if is_first_iteration {
for partition in active_canisters_partitioned_by_cores.iter_mut() {
Expand Down Expand Up @@ -1690,27 +1696,11 @@ impl Scheduler for SchedulerImpl {
scheduler_round_limits.update_subnet_round_limits(&subnet_round_limits);
};

// Scheduling.
let round_schedule = {
let _timer = self.metrics.round_scheduling_duration.start_timer();

let mut canisters = state.take_canister_states();
let round_schedule_candidate = self.apply_scheduling_strategy(
&round_log,
self.config.scheduler_cores,
current_round,
self.config.accumulated_priority_reset_interval,
&mut canisters,
);
state.put_canister_states(canisters);
round_schedule_candidate
};

// Inner round.
let (mut state, active_canister_ids) = self.inner_round(
&round_log,
state,
&mut csprng,
&round_schedule,
current_round,
&root_measurement_scope,
&mut scheduler_round_limits,
Expand Down
149 changes: 120 additions & 29 deletions rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn can_fully_execute_canisters_with_one_input_message_each() {
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores: 2,
max_instructions_per_round: NumInstructions::from(1 << 30),
max_instructions_per_round: NumInstructions::from(10),
max_instructions_per_message: NumInstructions::from(5),
max_instructions_per_message_without_dts: NumInstructions::from(5),
max_instructions_per_slice: NumInstructions::from(5),
Expand Down Expand Up @@ -1699,21 +1699,15 @@ fn can_fully_execute_multiple_canisters_with_multiple_messages_each() {
for canister_state in test.state().canisters_iter() {
let system_state = &canister_state.system_state;
assert_eq!(system_state.queues().ingress_queue_size(), 0);
assert_eq!(
canister_state.scheduler_state.last_full_execution_round,
ExecutionRound::new(1)
);
assert_eq!(
system_state
.canister_metrics
.skipped_round_due_to_no_messages,
0
);
assert_eq!(system_state.canister_metrics.executed, 1);
assert_eq!(
system_state.canister_metrics.interrupted_during_execution,
0
);

let scheduler_state = &canister_state.scheduler_state;
assert_eq!(scheduler_state.last_full_execution_round, test.last_round());

let metrics = &system_state.canister_metrics;
// The inner round was skipped once before breaking the round.
assert_eq!(metrics.skipped_round_due_to_no_messages, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low priority: Could we avoid unnecessarily bumping this counter on every round when we execute all messages?

Or rename it to something like "rounds when we executed all messages"? (Although this can probably be inferred from the number of instructions executed in that round.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we'll have performance optimizations in https://dfinity.atlassian.net/browse/EXC-1617 It's an orthogonal change to this MR.

assert_eq!(metrics.executed, 1);
assert_eq!(metrics.interrupted_during_execution, 0);
}

assert_eq!(
Expand Down Expand Up @@ -4781,50 +4775,64 @@ fn break_after_long_executions(#[strategy(2..10_usize)] scheduler_cores: usize)
}

/// Scenario:
/// 1. One canister with two long messages `slice + 1` instructions each.
/// 1. Scheduler cores + 1 canisters with two long executions each.
///
/// Expectations:
/// 1. After the first round the canister should have a paused long execution.
/// 2. After the second round the canister should have no executions, i.e. the
/// 2. After the second round the canister should have no executions, i.e.
/// finish the paused execution and should not start any new executions.
#[test]
fn filter_after_long_executions() {
let scheduler_cores = 2;
let max_instructions_per_slice = SchedulerConfig::application_subnet()
.max_instructions_per_slice
.get();

let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (max_instructions_per_slice * 2).into(),
max_instructions_per_message: (max_instructions_per_slice * 2).into(),
max_instructions_per_message_without_dts: max_instructions_per_slice.into(),
max_instructions_per_slice: max_instructions_per_slice.into(),
..SchedulerConfig::application_subnet()
})
.build();

// Create a canister with long messages
// Create scheduler cores + 1 canisters with long executions.
let mut long_message_ids = vec![];
let long_canister_id = test.create_canister();
for _ in 0..2 {
let long_message_id =
test.send_ingress(long_canister_id, ingress(max_instructions_per_slice + 1));
long_message_ids.push(long_message_id);
}
for _ in 0..scheduler_cores {
let another_canister_id = test.create_canister();
for _ in 0..2 {
test.send_ingress(another_canister_id, ingress(max_instructions_per_slice + 1));
}
}

// Remember the initial accumulated priority.
let canister = test.state().canister_state(&long_canister_id).unwrap();
let ap_before_execution = canister.scheduler_state.accumulated_priority;

// After the first round the canister should have a paused long execution.
test.execute_round(ExecutionRoundType::OrdinaryRound);
for canister in test.state().canisters_iter() {
assert_eq!(canister.system_state.canister_metrics.executed, 1);
assert!(canister.has_paused_execution());
}
let canister = test.state().canister_state(&long_canister_id).unwrap();
assert_eq!(canister.system_state.canister_metrics.executed, 1);
assert!(canister.has_paused_execution());

// After the second round the canister should have no executions, i.e. the
// finish the paused execution and should not start any new executions.
// After the second round the canister should finish the paused execution.
test.execute_round(ExecutionRoundType::OrdinaryRound);
for canister in test.state().canisters_iter() {
assert_eq!(canister.system_state.canister_metrics.executed, 2);
assert!(!canister.has_paused_execution());
}
let canister = test.state().canister_state(&long_canister_id).unwrap();
assert_eq!(canister.system_state.canister_metrics.executed, 2);
assert!(!canister.has_paused_execution());

// The accumulated priority should not increase after the long execution is finished.
dsarlis marked this conversation as resolved.
Show resolved Hide resolved
let ap_after_execution = canister.scheduler_state.accumulated_priority;
assert!(ap_after_execution <= ap_before_execution);
}

#[test]
Expand Down Expand Up @@ -5815,3 +5823,86 @@ fn scheduled_heap_delta_limit_scaling() {
assert_eq!(10, scheduled_limit(50, 50, 9, 10, 5));
assert_eq!(10, scheduled_limit(55, 50, 9, 10, 5));
}

#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })]
fn apply_scheduling_strategy_in_inner_round(#[strategy(2..10_usize)] scheduler_cores: usize) {
berestovskyy marked this conversation as resolved.
Show resolved Hide resolved
fn run_inner_rounds_on_cores(inner_rounds: u64, scheduler_cores: usize) -> SchedulerTest {
let instructions = 20;
let canisters_per_core = 2;
let max_messages_per_round = canisters_per_core * inner_rounds;
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (instructions * max_messages_per_round).into(),
max_instructions_per_message: instructions.into(),
max_instructions_per_message_without_dts: instructions.into(),
max_instructions_per_slice: instructions.into(),
instruction_overhead_per_execution: NumInstructions::from(0),
instruction_overhead_per_canister: NumInstructions::from(0),
instruction_overhead_per_canister_for_finalization: NumInstructions::from(0),
..SchedulerConfig::application_subnet()
})
.build();

// Bump up the round number.
test.execute_round(ExecutionRoundType::OrdinaryRound);

// Create 2 canisters for each scheduler core.
let num_canisters = scheduler_cores as u64 * canisters_per_core;
let mut canister_ids = vec![];
for _ in 0..num_canisters {
canister_ids.push(test.create_canister());
}

// Send a self call message to each canister. Having 2 canisters
// on each scheduler core and `2 * inner_rounds`` messages per round,
// all canisters should be executed in each inner round.
for canister_id in canister_ids.iter() {
let message = ingress(instructions).call(
other_side(*canister_id, instructions - 1),
on_response(instructions - 2),
);
test.send_ingress(*canister_id, message);
}

test.execute_round(ExecutionRoundType::OrdinaryRound);
test
}

let test = run_inner_rounds_on_cores(1, scheduler_cores);
let mut total_accumulated_priority = 0;
for (i, canister) in test.state().canisters_iter().enumerate() {
let system_state = &canister.system_state;
let scheduler_state = &canister.scheduler_state;
// After just one inner round, the first `scheduler_cores` canisters
// should be charged for execution.
if i < scheduler_cores {
prop_assert!(scheduler_state.accumulated_priority < 0.into());
} else {
prop_assert!(scheduler_state.accumulated_priority > 0.into());
}
// All ingresses should be executed in the previous round.
prop_assert_eq!(system_state.queues().ingress_queue_size(), 0);
prop_assert_eq!(system_state.canister_metrics.executed, 1);
prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round());
total_accumulated_priority += scheduler_state.accumulated_priority.get();
}
// The accumulated priority invariant should be respected.
prop_assert_eq!(total_accumulated_priority, 0);

let test = run_inner_rounds_on_cores(2, scheduler_cores);
let mut total_accumulated_priority = 0;
for canister in test.state().canisters_iter() {
let system_state = &canister.system_state;
let scheduler_state = &canister.scheduler_state;
// After two inner rounds, all canisters should be charged for execution.
prop_assert!(scheduler_state.accumulated_priority == 0.into());
// All ingresses should be executed twice in the previous round.
prop_assert_eq!(system_state.queues().ingress_queue_size(), 0);
prop_assert_eq!(system_state.canister_metrics.executed, 2);
prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round());
total_accumulated_priority += scheduler_state.accumulated_priority.get();
}
// The accumulated priority invariant should be respected.
prop_assert_eq!(total_accumulated_priority, 0);
}
Loading