From 8e21c26dfa4e5289fa7093100ecc1f4097effc82 Mon Sep 17 00:00:00 2001 From: Shubham Chauhan Date: Sun, 17 Dec 2023 17:37:19 +0530 Subject: [PATCH] #000: Shubham: Implement common interface for mpi and kafka --- common/Cargo.toml | 1 + common/config/test/travel_plan.json | 163 +++++++++ .../src/config/configuration.rs | 22 +- common/src/config/mod.rs | 2 + common/src/utils/mod.rs | 17 + engine-app/Cargo.toml | 1 + engine-app/src/main.rs | 123 +++++-- engine/Cargo.toml | 9 +- engine/config/simulation.json | 99 ++++++ engine/plot/.gitignore | 3 +- engine/src/engine_app.rs | 57 ++- engine/src/epidemiology_simulation.rs | 328 ++++++++++-------- engine/src/kafka/kafka_consumer.rs | 76 +++- engine/src/kafka/kafka_producer.rs | 4 +- engine/src/lib.rs | 7 +- engine/src/models/events/tick.rs | 3 + engine/src/run_mode.rs | 12 +- engine/src/tick/tick_util.rs | 54 ++- engine/src/transport/engine_handlers.rs | 85 +++++ engine/src/transport/kafka_transport.rs | 164 +++++++++ engine/src/transport/mod.rs | 42 +++ .../mod.rs => engine/src/transport/mpi_tag.rs | 26 +- engine/src/transport/mpi_transport.rs | 217 ++++++++++++ engine/src/travel/commute/mod.rs | 2 +- .../travel/migration/migrators_by_engine.rs | 2 +- engine/src/utils/util.rs | 6 +- orchestrator/src/main.rs | 10 +- 27 files changed, 1274 insertions(+), 261 deletions(-) create mode 100644 common/config/test/travel_plan.json rename orchestrator/src/config.rs => common/src/config/configuration.rs (92%) create mode 100644 engine/config/simulation.json create mode 100644 engine/src/transport/engine_handlers.rs create mode 100644 engine/src/transport/kafka_transport.rs create mode 100644 engine/src/transport/mod.rs rename orchestrator/src/utils/mod.rs => engine/src/transport/mpi_tag.rs (51%) create mode 100644 engine/src/transport/mpi_transport.rs diff --git a/common/Cargo.toml b/common/Cargo.toml index da039b80..527d3bba 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -12,3 +12,4 @@ serde_derive = "1.0.103" serde_json = "1.0.85" serde_yaml = "0.9.13" validator = { version = "0.16.0", features = ["derive"] } +log = "0.4.20" diff --git a/common/config/test/travel_plan.json b/common/config/test/travel_plan.json new file mode 100644 index 00000000..d717ab31 --- /dev/null +++ b/common/config/test/travel_plan.json @@ -0,0 +1,163 @@ +{ + "engine_configs": [ + { + "engine_id": "engine1", + "config": { + "sim_id": "sim-timestamp", + "population": { + "Auto": { + "number_of_agents": 10000, + "public_transport_percentage": 0.2, + "working_percentage": 0.7 + } + }, + "disease": { + "regular_transmission_start_day": 5, + "high_transmission_start_day": 20, + "last_day": 40, + "asymptomatic_last_day": 9, + "mild_infected_last_day": 12, + "regular_transmission_rate": 0.025, + "high_transmission_rate": 0.25, + "death_rate": 0.035, + "percentage_asymptomatic_population": 0.3, + "percentage_severe_infected_population": 0.3, + "exposed_duration": 48, + "pre_symptomatic_duration": 48 + }, + "geography_parameters": { + "grid_size": 250, + "hospital_beds_percentage": 0.003 + }, + "hours": 10000, + "interventions": [ + { + "Vaccinate": { + "at_hour": 5000, + "percent": 0.2 + } + }, + { + "Lockdown": { + "at_number_of_infections": 100, + "essential_workers_population": 0.1 + } + } + ] + } + }, + { + "engine_id": "engine2", + "config": { + "sim_id": "sim-timestamp", + "population": { + "Auto": { + "number_of_agents": 10000, + "public_transport_percentage": 0.2, + "working_percentage": 0.7 + } + }, + "disease": { + "regular_transmission_start_day": 5, + "high_transmission_start_day": 20, + "last_day": 40, + "asymptomatic_last_day": 9, + "mild_infected_last_day": 12, + "regular_transmission_rate": 0.025, + "high_transmission_rate": 0.25, + "death_rate": 0.035, + "percentage_asymptomatic_population": 0.3, + "percentage_severe_infected_population": 0.3, + "exposed_duration": 48, + "pre_symptomatic_duration": 48 + }, + "geography_parameters": { + "grid_size": 250, + "hospital_beds_percentage": 0.003 + }, + "hours": 10000, + "interventions": [ + { + "Vaccinate": { + "at_hour": 5000, + "percent": 0.2 + } + }, + { + "Lockdown": { + "at_number_of_infections": 100, + "essential_workers_population": 0.1 + } + } + ] + } + }, + { + "engine_id": "engine3", + "config": { + "sim_id": "sim-timestamp", + "population": { + "Auto": { + "number_of_agents": 10000, + "public_transport_percentage": 0.2, + "working_percentage": 0.7 + } + }, + "disease": { + "regular_transmission_start_day": 5, + "high_transmission_start_day": 20, + "last_day": 40, + "asymptomatic_last_day": 9, + "mild_infected_last_day": 12, + "regular_transmission_rate": 0.025, + "high_transmission_rate": 0.25, + "death_rate": 0.035, + "percentage_asymptomatic_population": 0.3, + "percentage_severe_infected_population": 0.3, + "exposed_duration": 48, + "pre_symptomatic_duration": 48 + }, + "geography_parameters": { + "grid_size": 250, + "hospital_beds_percentage": 0.003 + }, + "hours": 10000, + "interventions": [ + { + "Vaccinate": { + "at_hour": 5000, + "percent": 0.2 + } + }, + { + "Lockdown": { + "at_number_of_infections": 100, + "essential_workers_population": 0.1 + } + } + ] + } + } + ], + "travel_plan": { + "regions": ["engine1", "engine2", "engine3"], + "migration": { + "enabled": true, + "matrix": [ + [0, 156, 10], + [108, 0, 290], + [90, 75, 0] + ], + "start_migration_hour": 48, + "end_migration_hour": 336 + }, + "commute": { + "enabled": true, + "matrix": [ + [0, 15, 2], + [10, 0, 29], + [9, 7, 0] + ] + } + } +} diff --git a/orchestrator/src/config.rs b/common/src/config/configuration.rs similarity index 92% rename from orchestrator/src/config.rs rename to common/src/config/configuration.rs index 34aea801..0214ca70 100644 --- a/orchestrator/src/config.rs +++ b/common/src/config/configuration.rs @@ -17,13 +17,14 @@ * */ +use crate::config::Population::Auto; +use log::debug; use std::error::Error; use std::fs::File; -use common::config::Population::Auto; -use common::config::{Config, TravelPlanConfig}; -use common::models::custom_types::Percentage; -use common::models::travel_plan::TravelPlan; +use crate::config::{Config, TravelPlanConfig}; +use crate::models::custom_types::Percentage; +use crate::models::travel_plan::TravelPlan; pub const TRANSPORT_AREA_RELATIVE_SIZE: Percentage = 0.2; @@ -42,6 +43,10 @@ impl Configuration { self.engine_configs.iter().map(|s| s.engine_id.clone()).collect() } + pub fn get_engine_configs(&self) -> &Vec { + &self.engine_configs + } + pub fn read(filename: &str) -> Result> { let reader = File::open(filename)?; let config: Configuration = serde_json::from_reader(reader)?; @@ -114,16 +119,15 @@ impl Configuration { // just a struct for easier parsing #[derive(Deserialize, Serialize)] -struct EngineConfig { - engine_id: String, - config: Config, +pub struct EngineConfig { + pub engine_id: String, + pub config: Config, } #[cfg(test)] mod tests { use super::*; - use crate::get_hours; - use crate::utils::read_simulation_conf; + use crate::utils::{get_hours, read_simulation_conf}; #[test] fn should_read_config() { diff --git a/common/src/config/mod.rs b/common/src/config/mod.rs index db8496ed..52ae3b06 100644 --- a/common/src/config/mod.rs +++ b/common/src/config/mod.rs @@ -22,6 +22,7 @@ mod population; mod starting_infections; mod travel_plan_config; +mod configuration; pub mod intervention_config; pub mod request; @@ -34,6 +35,7 @@ use validator::Validate; pub use crate::config::geography_parameters::GeographyParameters; pub use crate::config::population::*; pub use crate::config::starting_infections::StartingInfections; +pub use configuration::{Configuration, EngineConfig}; use crate::disease::{Disease, DiseaseOverride}; use crate::models::custom_types::{Hour, Size}; diff --git a/common/src/utils/mod.rs b/common/src/utils/mod.rs index 301d1aaf..47d49da9 100644 --- a/common/src/utils/mod.rs +++ b/common/src/utils/mod.rs @@ -20,3 +20,20 @@ mod random_wrapper; pub use random_wrapper::RandomWrapper; +use serde_json::Value; +use std::fs::File; + +pub fn read_simulation_conf(filename: &str) -> String { + let reader = File::open(filename).unwrap(); + let config: Value = serde_json::from_reader(reader).unwrap(); + let sim = config.as_object().unwrap(); + serde_json::to_string(sim).unwrap() +} + +pub fn get_hours(filename: &str) -> i64 { + let reader = File::open(filename).unwrap(); + let config: Value = serde_json::from_reader(reader).unwrap(); + let sim = config.get("engine_configs").unwrap().as_array().unwrap(); + let hours = sim[0].get("config").unwrap().get("hours"); + hours.unwrap().as_i64().unwrap() +} diff --git a/engine-app/Cargo.toml b/engine-app/Cargo.toml index 1a8e984f..82ef3eb7 100644 --- a/engine-app/Cargo.toml +++ b/engine-app/Cargo.toml @@ -13,3 +13,4 @@ clap = { version = "4.0.32", features = ["derive"] } env_logger = "0.10.0" opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] } +mpi = { version = "0.7.0", features = ["user-operations", "derive"] } diff --git a/engine-app/src/main.rs b/engine-app/src/main.rs index 5c936df1..dac308e3 100644 --- a/engine-app/src/main.rs +++ b/engine-app/src/main.rs @@ -18,13 +18,36 @@ */ use clap::Parser; -use common::config::Config; -use common::disease::Disease; -use engine::{EngineApp, RunMode}; +use mpi::topology::Communicator; use opentelemetry::sdk::trace::{config, Span}; use opentelemetry::sdk::Resource; use opentelemetry::trace::{FutureExt, TraceContextExt, TraceError, Tracer}; use opentelemetry::{global, sdk, Context, KeyValue}; +use std::fmt::{Display, Formatter}; + +use common::config::{Config, Configuration, EngineConfig, TravelPlanConfig}; +use common::disease::Disease; +use engine::{EngineApp, MpiTransport, MultiEngineMode, RunMode}; + +const BUFFER_SIZE: usize = 100 * 1024 * 1024; + +#[derive(clap::ValueEnum, Clone, Debug)] +enum Mode { + Kafka, + MPI, + Standalone, +} + +impl Display for Mode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str = match self { + Mode::Kafka => "Kafka", + Mode::MPI => "MPI", + Mode::Standalone => "Standalone", + }; + write!(f, "{}", str) + } +} #[derive(Parser)] #[command(author, version, about)] @@ -32,10 +55,14 @@ struct Args { #[arg(short, long, value_name = "FILE", help = "Use a config file to run the simulation")] config: Option, - #[arg(short, long, default_value_t = false)] - #[arg(help = "Start the engine in daemon mode. It will wait for messages from Kafka. \ - Specifying this flag will cause the config argument to be ignored")] - daemon: bool, + // #[arg(short, long, default_value_t = false)] + // #[arg(help = "Start the engine in daemon mode. It will wait for messages from Kafka. \ + // Specifying this flag will cause the config argument to be ignored")] + // daemon: bool, + #[clap(value_enum)] + #[arg(short, long, default_value_t = Mode::Kafka)] + #[arg(help = "start the engine with a particular implementation- Kafka or MPI")] + mode: Mode, #[arg(short, long)] #[arg(help = "An identifier for the engine. Needed in daemon mode when running a larger simulation \ @@ -62,18 +89,24 @@ async fn main() { env_logger::init(); let args = Args::parse(); - let daemon = args.daemon; + println!("{:?}", args.mode); + let mode = args.mode; let has_named_engine = args.id.is_some(); let default_engine_id = "default_engine".to_string(); let engine_id = args.id.unwrap_or(default_engine_id); let number_of_threads = args.threads; - let run_mode = if daemon && has_named_engine { - RunMode::MultiEngine { engine_id: engine_id.to_string() } - } else if daemon { - RunMode::SingleDaemon - } else { - RunMode::Standalone + let run_mode = match mode { + Mode::Kafka => RunMode::MultiEngine { mode: MultiEngineMode::Kafka }, + Mode::MPI => RunMode::MultiEngine { mode: MultiEngineMode::MPI }, + Mode::Standalone => RunMode::Standalone, }; + // let run_mode = if daemon && has_named_engine { + // RunMode::MultiEngine { mode: MultiEngineMode::Kafka} + // } else if daemon { + // RunMode::SingleDaemon + // } else { + // RunMode::Standalone + // }; let disease_handler: Option = None; @@ -82,12 +115,60 @@ async fn main() { let span: Span = _tracer.start("root"); let cx: Context = Context::current_with_span(span); - if daemon { - EngineApp::start_in_daemon(&engine_id, &run_mode, disease_handler, number_of_threads).with_context(cx).await; - } else { - let default_config_path = "config/default.json".to_string(); - let config_file = args.config.unwrap_or(default_config_path); - let config = Config::read(&config_file).expect("Failed to read config file"); - EngineApp::start_standalone(config, &run_mode, disease_handler, number_of_threads).await; + match mode { + Mode::Kafka => { + EngineApp::start_in_daemon(&engine_id, &run_mode, disease_handler, number_of_threads).with_context(cx).await + } + Mode::MPI => { + println!("in multi-engine mode"); + let mut universe = mpi::initialize().unwrap(); + + // Try to attach a buffer. + universe.set_buffer_size(BUFFER_SIZE); + assert_eq!(universe.buffer_size(), BUFFER_SIZE); + + let world = universe.world(); + let rank = world.rank(); + let default_config_path = "engine/config/simulation.json".to_string(); + let config_path = args.config.unwrap_or(default_config_path); + let config = Configuration::read(&config_path).expect("Error while reading config"); + config.validate(); + let config_per_engine = config.get_engine_configs(); + let index: usize = (rank) as usize; + let self_config: &EngineConfig = config_per_engine.get(index).unwrap(); + let travel_plan: &TravelPlanConfig = config.get_travel_plan(); + let engine_config = &self_config.config; + let engine_id = String::from(&self_config.engine_id); + // FileLogger::init(engine_id.to_string()).unwrap(); + + let mpi_transport = MpiTransport::new(engine_id.to_string(), &travel_plan.get_regions()); + EngineApp::start_with_mpi( + engine_id.clone(), + engine_config.clone(), + &run_mode, + Some(travel_plan.clone()), + disease_handler, + Some(mpi_transport), + number_of_threads, + // output_dir, + ) + .with_context(cx) + .await; + } + Mode::Standalone => { + let default_config_path = "config/default.json".to_string(); + let config_file = args.config.unwrap_or(default_config_path); + let config = Config::read(&config_file).expect("Failed to read config file"); + EngineApp::start_standalone(config, &run_mode, disease_handler, number_of_threads).await; + } } + + // if daemon { + // EngineApp::start_in_daemon(&engine_id, &run_mode, disease_handler, number_of_threads).with_context(cx).await; + // } else { + // let default_config_path = "config/default.json".to_string(); + // let config_file = args.config.unwrap_or(default_config_path); + // let config = Config::read(&config_file).expect("Failed to read config file"); + // EngineApp::start_standalone(config, &run_mode, disease_handler, number_of_threads).await; + // } } diff --git a/engine/Cargo.toml b/engine/Cargo.toml index efb23a13..f2d3d7f4 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -32,8 +32,13 @@ validator = { version = "0.16.0", features = ["derive"] } rayon = "1.5" copystr = { version = "0.0.5", features = [ "serde" ] } fxhash = "0.2.1" -opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } -opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] } +opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } +opentelemetry-jaeger = { version = "0.19.0", features = ["rt-tokio"] } +mpi = { version = "0.7.0", features = ["user-operations", "derive"] } +#mpi = { git = "https://github.com/rsmpi/rsmpi.git",branch="main", features = ["user-operations", "derive"] } +bincode = "1.3.3" +async-trait = "0.1.68" +snap = "1.1.0" [profile.release] opt-level = 3 diff --git a/engine/config/simulation.json b/engine/config/simulation.json new file mode 100644 index 00000000..656bf3ad --- /dev/null +++ b/engine/config/simulation.json @@ -0,0 +1,99 @@ +{ + "engine_configs": [ + { + "engine_id": "engine1", + "config": { + "sim_id": "sim-timestamp", + "population": { + "Auto": { + "number_of_agents": 1000, + "public_transport_percentage": 0.2, + "working_percentage": 0.7 + } + }, + "disease": { + "death_rate": 0.035, + "percentage_asymptomatic_population": 0.3, + "exposed_duration": 48, + "last_day": 26, + "asymptomatic_last_day": 9, + "mild_infected_last_day": 12, + "regular_transmission_rate": 0.25, + "pre_symptomatic_duration": 48, + "percentage_severe_infected_population": 0.3, + "high_transmission_start_day": 6, + "high_transmission_rate": 0.25, + "regular_transmission_start_day": 5 + }, + "geography_parameters": { + "grid_size": 250, + "hospital_beds_percentage": 0.003 + }, + "hours": 1080, + "interventions": [ + { + "Lockdown": { + "at_number_of_infections": 100, + "essential_workers_population": 0.1 + } + } + ] + } + }, + { + "engine_id": "engine2", + "config": { + "sim_id": "sim-timestamp", + "population": { + "Auto": { + "number_of_agents": 1000, + "public_transport_percentage": 0.2, + "working_percentage": 0.7 + } + }, + "disease": { + "death_rate": 0.035, + "percentage_asymptomatic_population": 0.3, + "exposed_duration": 48, + "last_day": 26, + "asymptomatic_last_day": 9, + "mild_infected_last_day": 12, + "regular_transmission_rate": 0.25, + "pre_symptomatic_duration": 48, + "percentage_severe_infected_population": 0.3, + "high_transmission_start_day": 6, + "high_transmission_rate": 0.25, + "regular_transmission_start_day": 5 + }, + "geography_parameters": { + "grid_size": 250, + "hospital_beds_percentage": 0.003 + }, + "hours": 1080, + "interventions": [] + } + } + ], + "travel_plan": { + "regions": [ + "engine1", + "engine2" + ], + "migration": { + "enabled": true, + "matrix": [ + [0, 10], + [10, 0] + ], + "start_migration_hour": 48, + "end_migration_hour": 336 + }, + "commute": { + "enabled": true, + "matrix": [ + [0, 5], + [5, 0] + ] + } + } +} diff --git a/engine/plot/.gitignore b/engine/plot/.gitignore index ed8ebf58..eab2bb3e 100644 --- a/engine/plot/.gitignore +++ b/engine/plot/.gitignore @@ -1 +1,2 @@ -__pycache__ \ No newline at end of file +__pycache__ +epirust-plot \ No newline at end of file diff --git a/engine/src/engine_app.rs b/engine/src/engine_app.rs index 12189dc3..65aee248 100644 --- a/engine/src/engine_app.rs +++ b/engine/src/engine_app.rs @@ -17,37 +17,76 @@ * */ +use common::config::{Config, TravelPlanConfig}; + use crate::epidemiology_simulation::Epidemiology; use crate::kafka::kafka_consumer::KafkaConsumer; use crate::run_mode::RunMode; use crate::state_machine::DiseaseHandler; -use common::config::Config; +use crate::transport::engine_handlers::NoOpEngineHandlers; +use crate::{KafkaTransport, MpiTransport}; pub const STANDALONE_SIM_ID: &str = "0"; pub struct EngineApp; impl EngineApp { - pub async fn start_in_daemon( - engine_id: &str, + pub async fn start_with_mpi( + engine_id: String, + config: Config, run_mode: &RunMode, + travel_plan_config: Option, dsh: Option, + transport: Option, + threads: u32, + // output_dir_path: &Path, + ) { + // let transport: Option = MpiTransport::new(engine_id.clone(), ); + let engine_handlers = NoOpEngineHandlers::default(); + if dsh.is_none() { + let disease = config.get_disease(); + let mut epidemiology = + Epidemiology::new(engine_id, config, travel_plan_config, run_mode, disease, transport, engine_handlers); + epidemiology.run(threads).await; + } else { + let mut epidemiology = + Epidemiology::new(engine_id, config, travel_plan_config, run_mode, dsh.unwrap(), transport, engine_handlers); + epidemiology.run(threads).await; + } + info!("Done"); + } + + pub async fn start_in_daemon( + engine_id: &str, + run_mode: &RunMode, + dsh: Option, threads: u32, ) { info!("Started in daemon mode"); let consumer = KafkaConsumer::new(engine_id, &["simulation_requests"]); - consumer.listen_loop(run_mode, dsh, threads).await; + consumer.listen_loop(engine_id, run_mode, dsh, threads).await; info!("Done"); } - pub async fn start_standalone(config: Config, run_mode: &RunMode, dsh: Option, threads: u32) { + pub async fn start_standalone(config: Config, run_mode: &RunMode, dsh: Option, threads: u32) { + let transport: Option = None; + let engine_handlers = NoOpEngineHandlers::default(); if dsh.is_none() { let disease = config.get_disease(); - let mut epidemiology = Epidemiology::new(config, None, STANDALONE_SIM_ID.to_string(), run_mode, disease); - epidemiology.run(run_mode, threads).await; + let mut epidemiology = + Epidemiology::new(STANDALONE_SIM_ID.to_string(), config, None, run_mode, disease, transport, engine_handlers); + epidemiology.run(threads).await; } else { - let mut epidemiology = Epidemiology::new(config, None, STANDALONE_SIM_ID.to_string(), run_mode, dsh.unwrap()); - epidemiology.run(run_mode, threads).await; + let mut epidemiology = Epidemiology::new( + STANDALONE_SIM_ID.to_string(), + config, + None, + run_mode, + dsh.unwrap(), + transport, + engine_handlers, + ); + epidemiology.run(threads).await; } info!("Done"); } diff --git a/engine/src/epidemiology_simulation.rs b/engine/src/epidemiology_simulation.rs index 6c4effb5..2ba414d1 100644 --- a/engine/src/epidemiology_simulation.rs +++ b/engine/src/epidemiology_simulation.rs @@ -19,15 +19,17 @@ use core::borrow::BorrowMut; use std::borrow::Borrow; +use std::sync::{Arc, Mutex}; use std::time::Instant; -use common::config::{Config, Population, TravelPlanConfig}; -use common::models::CommutePlan; -use common::utils::RandomWrapper; use futures::join; use opentelemetry::trace::{FutureExt, Span, TraceContextExt, Tracer}; use opentelemetry::{global, Context, KeyValue}; +use common::config::{Config, Population, TravelPlanConfig}; +use common::models::CommutePlan; +use common::utils::RandomWrapper; + use crate::allocation_map::CitizenLocationMap; use crate::geography; use crate::geography::Point; @@ -35,8 +37,6 @@ use crate::interventions::hospital::BuildNewHospital; use crate::interventions::lockdown::LockdownIntervention; use crate::interventions::vaccination::VaccinateIntervention; use crate::interventions::Interventions; -use crate::kafka::kafka_producer::{KafkaProducer, COMMUTE_TOPIC, MIGRATION_TOPIC}; -use crate::kafka::{ticks_consumer, travel_consumer}; use crate::listeners::csv_service::CsvListener; use crate::listeners::disease_tracker::Hotspot; use crate::listeners::events_kafka_producer::EventsKafkaProducer; @@ -45,77 +45,86 @@ use crate::listeners::listener::{Listener, Listeners}; use crate::listeners::travel_counter::TravelCounter; use crate::models::constants; use crate::models::events::Counts; -use crate::models::events::Tick; -use crate::run_mode::RunMode; +use crate::run_mode::{MultiEngineMode, RunMode}; use crate::state_machine::DiseaseHandler; -use crate::tick::{receive_tick, send_ack}; -use crate::travel::commute; +use crate::transport::engine_handlers::EngineHandlers; +use crate::transport::Transport; use crate::travel::commute::Commuter; use crate::travel::commute::CommutersByRegion; -use crate::travel::migration::{EngineMigrationPlan, Migrator, MigratorsByRegion}; +use crate::travel::migration::{EngineMigrationPlan, Migrator}; use crate::utils::util::{counts_at_start, output_file_format}; -pub struct Epidemiology { +pub struct Epidemiology { + engine_id: String, pub citizen_location_map: CitizenLocationMap, - pub sim_id: String, pub travel_plan_config: Option, pub config: Config, counts_at_hr: Counts, listeners: Listeners, interventions: Interventions, rng: RandomWrapper, - disease_handler: T, + disease_handler: D, + transport: Option>>, + engine_handlers: EH, + run_mode: RunMode, } -impl Epidemiology { +impl Epidemiology { pub fn new( + engine_id: String, config: Config, travel_plan_config: Option, - sim_id: String, run_mode: &RunMode, - disease_handler: T, + disease_handler: D, + transport: Option, + engine_handlers: EH, ) -> Self { let start = Instant::now(); let start_infections = config.get_starting_infections(); - let mut grid = geography::define_geography(config.get_grid_size(), sim_id.clone()); + let mut grid = geography::define_geography(config.get_grid_size(), engine_id.clone()); let mut rng = RandomWrapper::new(); let (start_locations, agent_list) = match config.get_population() { - Population::Csv(csv_pop) => grid.read_population(csv_pop, start_infections, &mut rng, &sim_id), + Population::Csv(csv_pop) => grid.read_population(csv_pop, start_infections, &mut rng, &engine_id), Population::Auto(auto_pop) => { - grid.generate_population(auto_pop, start_infections, &mut rng, &travel_plan_config, sim_id.clone()) + grid.generate_population(auto_pop, start_infections, &mut rng, &travel_plan_config, engine_id.clone()) } }; grid.resize_hospital( agent_list.len() as i32, constants::HOSPITAL_STAFF_PERCENTAGE, config.get_geography_parameters().hospital_beds_percentage, - sim_id.clone(), + engine_id.clone(), ); let mut citizen_location_map = CitizenLocationMap::new(grid, &agent_list, &start_locations); info!("Initialization completed in {} seconds", start.elapsed().as_secs_f32()); let current_population = citizen_location_map.current_population(); - let listeners = Self::create_listeners(&sim_id, current_population as usize, run_mode, &config); + let listeners = Self::create_listeners(&engine_id, current_population as usize, run_mode, &config); let counts_at_hr = counts_at_start(current_population, config.get_starting_infections()); let interventions = Self::init_interventions(&config, &mut citizen_location_map, &mut rng); + let transport = transport.map(|x| Arc::new(Mutex::new(x))); + let run_mode = run_mode.clone(); Epidemiology { + engine_id, interventions, counts_at_hr, listeners, config, travel_plan_config, citizen_location_map, - sim_id, rng, disease_handler, + transport, + engine_handlers, + run_mode, } } fn create_listeners(engine_id: &str, current_pop: usize, run_mode: &RunMode, config: &Config) -> Listeners { - let output_file_format = output_file_format(config, run_mode); + let output_file_format = output_file_format(config, engine_id); let counts_file_name = format!("{output_file_format}.csv"); let csv_listener = CsvListener::new(counts_file_name); @@ -127,20 +136,23 @@ impl Epidemiology { match run_mode { RunMode::Standalone => {} - RunMode::SingleDaemon => { - let kafka_listener = - EventsKafkaProducer::new(engine_id.to_string(), current_pop, config.enable_citizen_state_messages()); - listeners_vec.push(Box::new(kafka_listener)); - } - RunMode::MultiEngine { .. } => { - let travels_file_name = format!("{output_file_format}_outgoing_travels.csv"); - let travel_counter = TravelCounter::new(travels_file_name); - listeners_vec.push(Box::new(travel_counter)); - let kafka_listener = - EventsKafkaProducer::new(engine_id.to_string(), current_pop, config.enable_citizen_state_messages()); - listeners_vec.push(Box::new(kafka_listener)); - } + RunMode::MultiEngine { mode } => match mode { + MultiEngineMode::Kafka => { + let travels_file_name = format!("{output_file_format}_outgoing_travels.csv"); + let travel_counter = TravelCounter::new(travels_file_name); + listeners_vec.push(Box::new(travel_counter)); + + let kafka_listener = + EventsKafkaProducer::new(engine_id.to_string(), current_pop, config.enable_citizen_state_messages()); + listeners_vec.push(Box::new(kafka_listener)); + } + MultiEngineMode::MPI => { + let travels_file_name = format!("{output_file_format}_outgoing_travels.csv"); + let travel_counter = TravelCounter::new(travels_file_name); + listeners_vec.push(Box::new(travel_counter)); + } + }, } Listeners::from(listeners_vec) @@ -162,24 +174,24 @@ impl Epidemiology { Interventions { vaccinate: vaccinations, lockdown: lock_down_details, build_new_hospital: hospital_intervention } } - pub async fn run(&mut self, run_mode: &RunMode, threads: u32) { + pub async fn run(&mut self, threads: u32) { rayon::ThreadPoolBuilder::new().num_threads(threads as usize).build_global().unwrap(); self.listeners.grid_updated(&self.citizen_location_map.grid); - match run_mode { - RunMode::MultiEngine { engine_id } => { + match self.run_mode { + RunMode::MultiEngine { .. } => { let tracer = global::tracer("epirust-trace"); - let mut span = tracer.start(format!("multi-engine - {engine_id}")); + let mut span = tracer.start(format!("multi-engine - {}", self.engine_id)); span.set_attribute(KeyValue::new("mode", "multi-engine")); - span.set_attribute(KeyValue::new("engine_id", engine_id.to_string())); + span.set_attribute(KeyValue::new("engine_id", self.engine_id.to_string())); let cx = Context::current_with_span(span); - self.run_multi_engine(engine_id).with_context(cx).await + self.run_multi_engine().with_context(cx).await } - _ => self.run_single_engine(run_mode).await, + _ => self.run_single_engine().await, } } - pub async fn run_single_engine(&mut self, run_mode: &RunMode) { + pub async fn run_single_engine(&mut self) { let start_time = Instant::now(); let mut outgoing_migrators = Vec::new(); let mut outgoing_commuters = Vec::new(); @@ -210,7 +222,7 @@ impl Epidemiology { &mut outgoing_commuters, self.config.enable_citizen_state_messages(), None, - &self.sim_id, + &self.engine_id, &self.disease_handler, ); @@ -221,10 +233,10 @@ impl Epidemiology { listeners, rng, &self.config, - &self.sim_id, + &self.engine_id, ); - if Self::stop_simulation(&mut interventions.lockdown, run_mode, *counts_at_hr) { + if Self::stop_simulation(&mut interventions.lockdown, &self.run_mode, *counts_at_hr) { break; } @@ -244,9 +256,10 @@ impl Epidemiology { listeners.simulation_ended(); } - pub async fn run_multi_engine(&mut self, engine_id: &String) { + pub async fn run_multi_engine(&mut self) { let start_time = Instant::now(); - let mut producer = KafkaProducer::new(); + // let mut producer = KafkaProducer::new(); + let engine_id = self.engine_id.to_string(); let travel_plan_config = self.travel_plan_config.as_ref().unwrap(); @@ -260,8 +273,8 @@ impl Epidemiology { EngineMigrationPlan::new(engine_id.clone(), migration_plan, self.citizen_location_map.current_population()); debug!("{}: Start Migrator Consumer", engine_id); - let migrators_consumer = travel_consumer::start(engine_id, &[&*format!("{MIGRATION_TOPIC}{engine_id}")], "migrate"); - let mut migration_stream = migrators_consumer.stream(); + // let migrators_consumer = travel_consumer::start(engine_id, &[&*format!("{MIGRATION_TOPIC}{engine_id}")], "migrate"); + // let mut migration_stream = migrators_consumer.stream(); let commute_plan = if is_commute_enabled { travel_plan_config.commute_plan() @@ -270,11 +283,11 @@ impl Epidemiology { }; debug!("{}: Start Commuter Consumer", engine_id); - let commute_consumer = travel_consumer::start(engine_id, &[&*format!("{COMMUTE_TOPIC}{engine_id}")], "commute"); - let mut commute_stream = commute_consumer.stream(); + // let commute_consumer = travel_consumer::start(engine_id, &[&*format!("{COMMUTE_TOPIC}{engine_id}")], "commute"); + // let mut commute_stream = commute_consumer.stream(); - let ticks_consumer = ticks_consumer::start(engine_id); - let mut ticks_stream = ticks_consumer.stream(); + // let ticks_consumer = ticks_consumer::start(engine_id); + // let mut ticks_stream = ticks_consumer.stream(); let mut n_incoming = 0; let mut n_outgoing = 0; @@ -283,6 +296,9 @@ impl Epidemiology { let interventions = self.interventions.borrow_mut(); let rng = self.rng.borrow_mut(); let disease_handler = self.disease_handler.borrow(); + let engine_handlers = self.engine_handlers.borrow_mut(); + + let transport = self.transport.borrow(); counts_at_hr.log(); @@ -291,15 +307,22 @@ impl Epidemiology { let mut total_receive_migration_sync_time = 0; let mut total_send_commuters_time = 0; let mut total_send_migrator_time = 0; - let run_mode = RunMode::MultiEngine { engine_id: engine_id.to_string() }; + // let run_mode = RunMode::MultiEngine { mode: }; let hours = self.config.get_hours(); let config = &self.config; for simulation_hour in 1..hours { + // let transport = transport.as_ref(); let start_time = Instant::now(); let tracer = global::tracer("epirust-trace"); - let tick = - receive_tick(&run_mode, &mut ticks_stream, simulation_hour, is_commute_enabled, is_migration_enabled).await; + + let tick = transport + .clone() + .unwrap() + .try_lock() + .unwrap() + .receive_tick(simulation_hour, is_commute_enabled, is_migration_enabled) + .await; if let Some(t) = tick { total_tick_sync_time += start_time.elapsed().as_millis(); info!("total tick sync time as hour {} - is {}", simulation_hour, total_tick_sync_time); @@ -328,11 +351,19 @@ impl Epidemiology { } let mut actual_outgoing: Vec<(Point, Migrator)> = Vec::new(); - let received_migrators = if is_migration_enabled { - debug!("{}: Received Migrators | Simulation hour: {}", engine_id, simulation_hour); - Some(engine_migration_plan.receive_migrators(tick, &mut migration_stream)) - } else { - None + let received_migrators = async { + let migrators = if is_migration_enabled { + let arc = transport.clone().unwrap(); + let mut guard = arc.try_lock().unwrap(); + debug!("{}: Received Migrators | Simulation hour: {}", engine_id, simulation_hour); + // let (incoming, ) = join!(result.as_mut().unwrap().receive_migrators(tick.unwrap().hour(), &engine_migration_plan)); + let vec = guard.receive_migrators(simulation_hour, &engine_migration_plan).await; + Some(vec) + // Some(engine_migration_plan.receive_migrators(tick, &mut migration_stream)) + } else { + None + }; + migrators }; let mut outgoing_commuters: Vec<(Point, Commuter)> = Vec::new(); @@ -350,7 +381,7 @@ impl Epidemiology { &mut outgoing_commuters, config.enable_citizen_state_messages(), Some(travel_plan_config), - engine_id, + &engine_id, disease_handler, ); debug!("{}: Simulation finished for hour: {}", engine_id, simulation_hour); @@ -373,16 +404,32 @@ impl Epidemiology { Vec::new() }; - if is_migration_enabled { + if is_migration_enabled && tick.is_some() { debug!("{}: Send Migrators", engine_id); let send_migrator_start_time = Instant::now(); - Self::send_migrators(tick, &mut producer, outgoing_migrators_by_region); + transport + .clone() + .unwrap() + .try_lock() + .unwrap() + .send_migrators(tick.unwrap().hour(), outgoing_migrators_by_region) + .await; + // Self::send_migrators(, &mut producer, outgoing_migrators_by_region); + debug!("{}: Send Migrators Successful", engine_id); total_send_migrator_time += send_migrator_start_time.elapsed().as_millis(); } - if is_commute_enabled { + if is_commute_enabled && tick.is_some() { debug!("{}: Send Commuters", engine_id); let send_commuter_start_time = Instant::now(); - Self::send_commuters(tick, &mut producer, outgoing_commuters_by_region); + transport + .clone() + .unwrap() + .try_lock() + .unwrap() + .send_commuters(tick.unwrap().hour(), outgoing_commuters_by_region) + .await; + debug!("{}: Send Commuters Successful", engine_id); + // Self::send_commuters(tick, &mut producer, outgoing_commuters_by_region); total_send_commuters_time += send_commuter_start_time.elapsed().as_millis(); } }; @@ -392,12 +439,29 @@ impl Epidemiology { let cx1 = Context::current_with_span(span1); let _ = join!(sim).with_context(cx1); - if is_commute_enabled { + if is_migration_enabled { + let migration_start_time = Instant::now(); + let (incoming,) = join!(received_migrators); + total_receive_migration_sync_time += migration_start_time.elapsed().as_millis(); + // let mut incoming = received_migrators.unwrap(); + let mut incoming = incoming.unwrap(); + n_incoming += incoming.len(); + n_outgoing += outgoing.len(); + self.citizen_location_map.remove_migrators(&actual_outgoing, counts_at_hr); + self.citizen_location_map.assimilate_migrators(&mut incoming, counts_at_hr, rng); + debug!("{}: assimilated the migrators", engine_id); + } + + let option = transport.clone().unwrap(); + + if is_commute_enabled && tick.is_some() { let commute_start_time = Instant::now(); let mut span2 = tracer.start("receive_commuters"); span2.set_attribute(KeyValue::new("hour", simulation_hour.to_string())); let cx2 = Context::current_with_span(span2); - let received_commuters = commute::receive_commuters(&commute_plan, tick, &mut commute_stream, engine_id); + // let received_commuters = commute::receive_commuters(&commute_plan, tick, &mut commute_stream, engine_id); + let mut guard1 = option.try_lock().unwrap(); + let received_commuters = guard1.receive_commuters(tick.unwrap().hour(), &commute_plan); let mut incoming_commuters = received_commuters.with_context(cx2).await; total_receive_commute_sync_time += commute_start_time.elapsed().as_millis(); info!("total commute sync time as hour {} - is {}", simulation_hour, total_receive_commute_sync_time); @@ -408,17 +472,6 @@ impl Epidemiology { debug!("{}: assimilated the commuters", engine_id); } - if is_migration_enabled { - let migration_start_time = Instant::now(); - let (mut incoming,) = join!(received_migrators.unwrap()); - total_receive_migration_sync_time += migration_start_time.elapsed().as_millis(); - n_incoming += incoming.len(); - n_outgoing += outgoing.len(); - self.citizen_location_map.remove_migrators(&actual_outgoing, counts_at_hr); - self.citizen_location_map.assimilate_migrators(&mut incoming, counts_at_hr, rng); - debug!("{}: assimilated the migrators", engine_id); - } - self.listeners.counts_updated(*counts_at_hr); self.citizen_location_map.process_interventions( interventions, @@ -426,22 +479,26 @@ impl Epidemiology { &mut self.listeners, rng, &self.config, - engine_id, + &self.engine_id, ); - if Self::stop_simulation(&mut interventions.lockdown, &run_mode, *counts_at_hr) { + if Self::stop_simulation(&mut interventions.lockdown, &self.run_mode, *counts_at_hr) { break; } - send_ack( - &run_mode, - &mut producer, + engine_handlers.on_tick_end( + &self.engine_id, *counts_at_hr, simulation_hour, &interventions.lockdown, is_commute_enabled, is_migration_enabled, ); + // send_ack( + // + // &mut producer, + // ation_enabled, + // ); if simulation_hour % 100 == 0 { info!( @@ -472,20 +529,20 @@ impl Epidemiology { self.listeners.simulation_ended(); } - fn send_migrators(tick: Option, producer: &mut KafkaProducer, outgoing: Vec) { - if tick.is_some() && tick.unwrap().hour() % 24 == 0 { - producer.send_migrators(outgoing); - } - } - - fn send_commuters(tick_op: Option, producer: &mut KafkaProducer, outgoing: Vec) { - if let Some(tick) = tick_op { - let hour = tick.hour() % 24; - if hour == constants::ROUTINE_TRAVEL_START_TIME || hour == constants::ROUTINE_TRAVEL_END_TIME { - producer.send_commuters(outgoing); - } - } - } + // fn send_migrators(tick: Option, producer: &mut KafkaProducer, outgoing: Vec) { + // if tick.is_some() && tick.unwrap().hour() % 24 == 0 { + // producer.send_migrators(outgoing); + // } + // } + // + // fn send_commuters(tick_op: Option, producer: &mut KafkaProducer, outgoing: Vec) { + // if let Some(tick) = tick_op { + // let hour = tick.hour() % 24; + // if hour == constants::ROUTINE_TRAVEL_START_TIME || hour == constants::ROUTINE_TRAVEL_END_TIME { + // producer.send_commuters(outgoing); + // } + // } + // } fn stop_simulation(lock_down_details: &mut LockdownIntervention, run_mode: &RunMode, row: Counts) -> bool { let zero_active_cases = row.get_exposed() == 0 && row.get_infected() == 0 && row.get_hospitalized() == 0; @@ -503,44 +560,35 @@ impl Epidemiology { #[cfg(test)] mod tests { - use crate::engine_app::STANDALONE_SIM_ID; - use crate::geography::Area; - use crate::geography::Point; - use common::config::intervention_config::{InterventionConfig, VaccinateConfig}; - use common::config::{AutoPopulation, GeographyParameters}; - use common::disease::Disease; - - use super::*; - - #[test] - fn should_init() { - let pop = AutoPopulation { number_of_agents: 10, public_transport_percentage: 1.0, working_percentage: 1.0 }; - let disease = Disease::new(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 0); - let vac = VaccinateConfig { at_hour: 5000, percent: 0.2 }; - let geography_parameters = GeographyParameters::new(100, 0.003); - let config = Config::new( - Population::Auto(pop), - Some(disease), - geography_parameters, - vec![], - 100, - vec![InterventionConfig::Vaccinate(vac)], - None, - ); - let epidemiology: Epidemiology<_> = - Epidemiology::new(config, None, STANDALONE_SIM_ID.to_string(), &RunMode::Standalone, disease); - let expected_housing_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(0, 0), Point::new(39, 100)); - assert_eq!(epidemiology.citizen_location_map.grid.housing_area, expected_housing_area); - - let expected_transport_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(40, 0), Point::new(59, 100)); - assert_eq!(epidemiology.citizen_location_map.grid.transport_area, expected_transport_area); - - let expected_work_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(60, 0), Point::new(79, 100)); - assert_eq!(epidemiology.citizen_location_map.grid.work_area, expected_work_area); - - let expected_hospital_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(80, 0), Point::new(89, 0)); - assert_eq!(epidemiology.citizen_location_map.grid.hospital_area, expected_hospital_area); - - assert_eq!(epidemiology.citizen_location_map.current_population(), 10); - } + // #[test] + // fn should_init() { + // let pop = AutoPopulation { number_of_agents: 10, public_transport_percentage: 1.0, working_percentage: 1.0 }; + // let disease = Disease::new(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 0); + // let vac = VaccinateConfig { at_hour: 5000, percent: 0.2 }; + // let geography_parameters = GeographyParameters::new(100, 0.003); + // let config = Config::new( + // Population::Auto(pop), + // Some(disease), + // geography_parameters, + // vec![], + // 100, + // vec![InterventionConfig::Vaccinate(vac)], + // None, + // ); + // let epidemiology: Epidemiology<_> = + // Epidemiology::new(config, None, STANDALONE_SIM_ID.to_string(), &RunMode::Standalone, disease, KafkaTransport::); + // let expected_housing_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(0, 0), Point::new(39, 100)); + // assert_eq!(epidemiology.citizen_location_map.grid.housing_area, expected_housing_area); + // + // let expected_transport_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(40, 0), Point::new(59, 100)); + // assert_eq!(epidemiology.citizen_location_map.grid.transport_area, expected_transport_area); + // + // let expected_work_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(60, 0), Point::new(79, 100)); + // assert_eq!(epidemiology.citizen_location_map.grid.work_area, expected_work_area); + // + // let expected_hospital_area = Area::new(&STANDALONE_SIM_ID.to_string(), Point::new(80, 0), Point::new(89, 0)); + // assert_eq!(epidemiology.citizen_location_map.grid.hospital_area, expected_hospital_area); + // + // assert_eq!(epidemiology.citizen_location_map.current_population(), 10); + // } } diff --git a/engine/src/kafka/kafka_consumer.rs b/engine/src/kafka/kafka_consumer.rs index 70a972f2..574a4373 100644 --- a/engine/src/kafka/kafka_consumer.rs +++ b/engine/src/kafka/kafka_consumer.rs @@ -19,7 +19,6 @@ use std::error::Error; -use common::config::request::Request; use futures::StreamExt; use opentelemetry::trace::{FutureExt, TraceContextExt, Tracer}; use opentelemetry::{global, Context}; @@ -30,10 +29,16 @@ use rdkafka::message::BorrowedMessage; use rdkafka::message::Message; use rdkafka::ClientConfig; +use common::config::request::Request; + use crate::epidemiology_simulation::Epidemiology; +use crate::kafka::kafka_producer::{KafkaProducer, COMMUTE_TOPIC, MIGRATION_TOPIC}; +use crate::kafka::{ticks_consumer, travel_consumer}; use crate::run_mode::RunMode; use crate::state_machine::DiseaseHandler; +use crate::transport::engine_handlers::{EngineHandlers, KafkaImplEngineHandler}; use crate::utils::environment; +use crate::KafkaTransport; pub struct KafkaConsumer<'a> { engine_id: &'a str, @@ -57,10 +62,12 @@ impl KafkaConsumer<'_> { KafkaConsumer { engine_id, consumer } } - pub async fn listen_loop( + //Todo: Fix this function, it is unnecessary. We can directly take the config from the app rather than kafka + pub async fn listen_loop( &self, + engine_id: &str, run_mode: &RunMode, - disease_handler: Option, + disease_handler: Option, threads: u32, ) { let mut message_stream: MessageStream = self.consumer.stream(); @@ -76,8 +83,25 @@ impl KafkaConsumer<'_> { ); } Ok(request) => { - self.run_sim(request, run_mode, disease_handler.clone(), threads).await; - if let RunMode::MultiEngine { engine_id: _e } = run_mode { + let migrators_consumer = + travel_consumer::start(engine_id, &[&*format!("{MIGRATION_TOPIC}{engine_id}")], "migrate"); + let migration_stream = migrators_consumer.stream(); + + let commute_consumer = + travel_consumer::start(engine_id, &[&*format!("{COMMUTE_TOPIC}{engine_id}")], "commute"); + let commute_stream = commute_consumer.stream(); + + let ticks_consumer = ticks_consumer::start(engine_id); + let ticks_stream = ticks_consumer.stream(); + let producer = KafkaProducer::new(); + let transport = + KafkaTransport::new(engine_id.to_string(), producer, ticks_stream, commute_stream, migration_stream); + + //Todo: fix it there is already a kafkaProducer in the scope, try to use that (think of merging the engine handlers and transport) + let engine_handlers = KafkaImplEngineHandler::new(KafkaProducer::new()); + + self.run_sim(request, run_mode, disease_handler.clone(), Some(transport), engine_handlers, threads).await; + if let RunMode::MultiEngine { .. } = run_mode { return; } } @@ -85,22 +109,33 @@ impl KafkaConsumer<'_> { } } - async fn run_sim( + async fn run_sim<'a, D: DiseaseHandler + Sync, EH: EngineHandlers>( &self, request: Request, run_mode: &RunMode, - disease_handler: Option, + disease_handler: Option, + transport: Option>, + engine_handlers: EH, threads: u32, ) { match request { Request::SimulationRequest(req) => { if disease_handler.is_none() { let disease = req.config.get_disease(); - let mut epidemiology = Epidemiology::new(req.config, None, req.sim_id, run_mode, disease); - epidemiology.run(run_mode, threads).await; + let mut epidemiology = + Epidemiology::new(req.sim_id, req.config, None, run_mode, disease, transport, engine_handlers); + epidemiology.run(threads).await; } else { - let mut epidemiology = Epidemiology::new(req.config, None, req.sim_id, run_mode, disease_handler.unwrap()); - epidemiology.run(run_mode, threads).await; + let mut epidemiology = Epidemiology::new( + req.sim_id, + req.config, + None, + run_mode, + disease_handler.unwrap(), + transport, + engine_handlers, + ); + epidemiology.run(threads).await; }; } Request::MultiSimRequest(req) => { @@ -114,21 +149,30 @@ impl KafkaConsumer<'_> { let config = req.config.config.clone(); if disease_handler.is_none() { let disease = config.get_disease(); - let mut epidemiology = - Epidemiology::new(config, travel_plan_config, req.engine_id.to_string(), run_mode, disease); - epidemiology.run(run_mode, threads).await; - } else { let mut epidemiology = Epidemiology::new( + req.engine_id.to_string(), config, travel_plan_config, + run_mode, + disease, + transport, + engine_handlers, + ); + epidemiology.run(threads).await; + } else { + let mut epidemiology = Epidemiology::new( req.engine_id.to_string(), + config, + travel_plan_config, run_mode, disease_handler.unwrap(), + transport, + engine_handlers, ); let tracer = global::tracer("epirust-trace"); let span = tracer.start("run"); let cx = Context::current_with_span(span); - epidemiology.run(run_mode, threads).with_context(cx).await; + epidemiology.run(threads).with_context(cx).await; } } } diff --git a/engine/src/kafka/kafka_producer.rs b/engine/src/kafka/kafka_producer.rs index 7d2c49c0..e6adb6b3 100644 --- a/engine/src/kafka/kafka_producer.rs +++ b/engine/src/kafka/kafka_producer.rs @@ -51,7 +51,7 @@ impl KafkaProducer { self.producer.send(record) } - pub fn send_migrators(&mut self, outgoing: Vec) { + pub fn send_migrators(&self, outgoing: Vec) { for out_region in outgoing.iter() { let payload = serde_json::to_string(out_region).unwrap(); trace!("Sending migrators: {} to region: {}", payload, out_region.to_engine_id()); @@ -64,7 +64,7 @@ impl KafkaProducer { } } - pub fn send_commuters(&mut self, outgoing: Vec) { + pub fn send_commuters(&self, outgoing: Vec) { for out_region in outgoing.iter() { let payload = serde_json::to_string(out_region).unwrap(); trace!("Sending commuters: {} to region: {}", payload, out_region.to_engine_id()); diff --git a/engine/src/lib.rs b/engine/src/lib.rs index 1d9ee407..876f41ee 100644 --- a/engine/src/lib.rs +++ b/engine/src/lib.rs @@ -40,7 +40,12 @@ mod travel; mod utils; pub mod geography; +mod transport; pub use engine_app::EngineApp; -pub use run_mode::RunMode; +pub use run_mode::{MultiEngineMode, RunMode}; pub use state_machine::*; +pub use transport::engine_handlers::EngineHandlers; +pub use transport::kafka_transport::KafkaTransport; +pub use transport::mpi_transport::MpiTransport; +pub use transport::Transport; diff --git a/engine/src/models/events/tick.rs b/engine/src/models/events/tick.rs index f442b2be..bc39bbf6 100644 --- a/engine/src/models/events/tick.rs +++ b/engine/src/models/events/tick.rs @@ -26,6 +26,9 @@ pub struct Tick { } impl Tick { + pub fn new(hour: Hour, terminate: bool) -> Self { + Tick { hour, terminate } + } pub fn hour(&self) -> Hour { self.hour } diff --git a/engine/src/run_mode.rs b/engine/src/run_mode.rs index 6654dd31..512dc7bc 100644 --- a/engine/src/run_mode.rs +++ b/engine/src/run_mode.rs @@ -17,13 +17,17 @@ * */ +#[derive(Clone)] pub enum RunMode { //run once and exit Standalone, - //daemon mode, with only one engine - SingleDaemon, - //daemon mode, with multiple engines and an orchestrator - MultiEngine { engine_id: String }, + MultiEngine { mode: MultiEngineMode }, +} + +#[derive(Clone)] +pub enum MultiEngineMode { + Kafka, + MPI, } diff --git a/engine/src/tick/tick_util.rs b/engine/src/tick/tick_util.rs index 561884ff..4a24f328 100644 --- a/engine/src/tick/tick_util.rs +++ b/engine/src/tick/tick_util.rs @@ -17,17 +17,18 @@ * */ +use futures::StreamExt; +use opentelemetry::trace::{FutureExt, Span, TraceContextExt, Tracer}; +use opentelemetry::{global, Context, KeyValue}; +use rdkafka::consumer::MessageStream; + +use common::models::custom_types::Hour; + use crate::interventions::lockdown::LockdownIntervention; use crate::kafka::kafka_producer::KafkaProducer; use crate::kafka::ticks_consumer; use crate::models::constants; use crate::models::events::{Counts, Tick, TickAck}; -use crate::run_mode::RunMode; -use common::models::custom_types::Hour; -use futures::StreamExt; -use opentelemetry::trace::{FutureExt, Span, TraceContextExt, Tracer}; -use opentelemetry::{global, Context, KeyValue}; -use rdkafka::consumer::MessageStream; pub async fn extract_tick(message_stream: &mut MessageStream<'_>) -> Tick { debug!("Start receiving tick"); @@ -53,7 +54,6 @@ pub async fn get_tick(message_stream: &mut MessageStream<'_>, simulation_hour: H } pub async fn receive_tick( - run_mode: &RunMode, message_stream: &mut MessageStream<'_>, simulation_hour: Hour, is_commute_enabled: bool, @@ -65,23 +65,21 @@ pub async fn receive_tick( let receive_tick_for_commute: bool = is_commute_enabled && is_commute_hour; let receive_tick_for_migration: bool = is_migration_enabled && is_migration_hour; if receive_tick_for_commute || receive_tick_for_migration { - if let RunMode::MultiEngine { engine_id: _e } = run_mode { - let tracer = global::tracer("epirust-trace"); - let mut span = tracer.start("tick_wait_time"); - span.set_attribute(KeyValue::new("hour", simulation_hour.to_string())); - let cx = Context::current_with_span(span); - let t = get_tick(message_stream, simulation_hour).with_context(cx).await; - if t.hour() != simulation_hour { - panic!("Local hour is {}, but received tick for {}", simulation_hour, t.hour()); - } - return Some(t); + let tracer = global::tracer("epirust-trace"); + let mut span = tracer.start("tick_wait_time"); + span.set_attribute(KeyValue::new("hour", simulation_hour.to_string())); + let cx = Context::current_with_span(span); + let t = get_tick(message_stream, simulation_hour).with_context(cx).await; + if t.hour() != simulation_hour { + panic!("Local hour is {}, but received tick for {}", simulation_hour, t.hour()); } + return Some(t); } None } pub fn send_ack( - run_mode: &RunMode, + engine_id: &str, producer: &mut KafkaProducer, counts: Counts, simulation_hour: Hour, @@ -96,18 +94,14 @@ pub fn send_ack( let received_tick_for_migration: bool = is_migration_enabled && is_migration_hour; if simulation_hour == 1 || received_tick_for_commute || received_tick_for_migration { - if let RunMode::MultiEngine { engine_id } = run_mode { - let ack = TickAck { - engine_id: engine_id.to_string(), - hour: simulation_hour, - counts, - locked_down: lockdown.is_locked_down(), - }; - let tick_string = serde_json::to_string(&ack).unwrap(); - match producer.send_ack(&tick_string) { - Ok(_) => {} - Err(e) => panic!("Failed while sending acknowledgement: {:?}", e.0), - } + // if let RunMode::MultiEngine { engine_id } = run_mode { + let ack = + TickAck { engine_id: engine_id.to_string(), hour: simulation_hour, counts, locked_down: lockdown.is_locked_down() }; + let tick_string = serde_json::to_string(&ack).unwrap(); + match producer.send_ack(&tick_string) { + Ok(_) => {} + Err(e) => panic!("Failed while sending acknowledgement: {:?}", e.0), } + // } } } diff --git a/engine/src/transport/engine_handlers.rs b/engine/src/transport/engine_handlers.rs new file mode 100644 index 00000000..b3f68c57 --- /dev/null +++ b/engine/src/transport/engine_handlers.rs @@ -0,0 +1,85 @@ +/* + * EpiRust + * Copyright (c) 2023 ThoughtWorks, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::interventions::lockdown::LockdownIntervention; +use crate::kafka::kafka_producer::KafkaProducer; +use crate::models::events::Counts; +use crate::tick::send_ack; +use common::models::custom_types::Hour; + +pub trait EngineHandlers { + fn on_tick_start(); + fn on_tick_end( + &mut self, + engine_id: &str, + counts: Counts, + simulation_hour: Hour, + lockdown: &LockdownIntervention, + is_commute_enabled: bool, + is_migration_enabled: bool, + ); +} + +pub struct NoOpEngineHandlers; + +impl Default for NoOpEngineHandlers { + fn default() -> Self { + NoOpEngineHandlers {} + } +} +impl EngineHandlers for NoOpEngineHandlers { + fn on_tick_start() {} + + fn on_tick_end( + &mut self, + _engine_id: &str, + _counts: Counts, + _simulation_hour: Hour, + _lockdown: &LockdownIntervention, + _is_commute_enabled: bool, + _is_migration_enabled: bool, + ) { + } +} + +pub struct KafkaImplEngineHandler { + producer: KafkaProducer, +} + +impl KafkaImplEngineHandler { + pub fn new(producer: KafkaProducer) -> Self { + KafkaImplEngineHandler { producer } + } +} + +impl EngineHandlers for KafkaImplEngineHandler { + fn on_tick_start() {} + + fn on_tick_end( + &mut self, + engine_id: &str, + counts: Counts, + simulation_hour: Hour, + lockdown: &LockdownIntervention, + is_commute_enabled: bool, + is_migration_enabled: bool, + ) { + send_ack(engine_id, &mut self.producer, counts, simulation_hour, lockdown, is_commute_enabled, is_migration_enabled); + } +} diff --git a/engine/src/transport/kafka_transport.rs b/engine/src/transport/kafka_transport.rs new file mode 100644 index 00000000..40013516 --- /dev/null +++ b/engine/src/transport/kafka_transport.rs @@ -0,0 +1,164 @@ +/* + * EpiRust + * Copyright (c) 2023 ThoughtWorks, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use async_trait::async_trait; +use futures::StreamExt; +use opentelemetry::global::ObjectSafeSpan; +use opentelemetry::trace::{FutureExt, TraceContextExt, Tracer}; +use opentelemetry::{global, Context, KeyValue}; +use rdkafka::consumer::MessageStream; + +use common::models::custom_types::Hour; +use common::models::travel_plan::TravelPlan; +use common::models::CommutePlan; + +use crate::kafka::kafka_producer::KafkaProducer; +use crate::kafka::{ticks_consumer, travel_consumer}; +use crate::models::constants; +use crate::models::events::Tick; +use crate::transport::Transport; +use crate::travel::commute::{trace_commuters, Commuter, CommutersByRegion}; +use crate::travel::migration::{EngineMigrationPlan, Migrator, MigratorsByRegion}; + +pub struct KafkaTransport<'a> { + engine_id: String, + producer: KafkaProducer, + tick_stream: MessageStream<'a>, + commuter_stream: MessageStream<'a>, + migration_stream: MessageStream<'a>, +} + +impl<'a> KafkaTransport<'a> { + pub fn new( + engine_id: String, + producer: KafkaProducer, + tick_stream: MessageStream<'a>, + commuter_stream: MessageStream<'a>, + migration_stream: MessageStream<'a>, + ) -> Self { + KafkaTransport { engine_id, producer, tick_stream, commuter_stream, migration_stream } + } +} + +impl<'a> KafkaTransport<'a> { + async fn get_tick(&mut self, simulation_hour: Hour) -> Tick { + let mut tick = self.extract_tick().await; + let mut tick_hour = tick.hour(); + while tick_hour < simulation_hour { + tick = self.extract_tick().await; + tick_hour = tick.hour(); + } + tick + } + + async fn extract_tick(&mut self) -> Tick { + debug!("Start receiving tick"); + let msg = self.tick_stream.next().await; + let mut maybe_tick = ticks_consumer::read(msg); + while maybe_tick.is_none() { + debug!("Retry for Tick"); + let next_msg = self.tick_stream.next().await; + maybe_tick = ticks_consumer::read(next_msg); + } + debug!("Received Tick Successfully"); + maybe_tick.unwrap() + } +} + +unsafe impl Sync for KafkaTransport<'_> {} +unsafe impl Send for KafkaTransport<'_> {} + +#[async_trait] +impl<'a> Transport for KafkaTransport<'a> { + async fn receive_tick( + &mut self, + simulation_hour: Hour, + is_commute_enabled: bool, + is_migration_enabled: bool, + ) -> Option { + let day_hour = simulation_hour % 24; + let is_commute_hour = day_hour == constants::ROUTINE_TRAVEL_END_TIME || day_hour == constants::ROUTINE_TRAVEL_START_TIME; + let is_migration_hour = day_hour == 0; + let receive_tick_for_commute: bool = is_commute_enabled && is_commute_hour; + let receive_tick_for_migration: bool = is_migration_enabled && is_migration_hour; + if receive_tick_for_commute || receive_tick_for_migration { + let tracer = global::tracer("epirust-trace"); + let mut span = tracer.start("tick_wait_time"); + span.set_attribute(KeyValue::new("hour", simulation_hour.to_string())); + let cx = Context::current_with_span(span); + let t = self.get_tick(simulation_hour).with_context(cx).await; + if t.hour() != simulation_hour { + panic!("Local hour is {}, but received tick for {}", simulation_hour, t.hour()); + } + return Some(t); + } + None + } + + async fn send_commuters(&self, simulation_hour: Hour, commuters: Vec) { + let hour = simulation_hour % 24; + if hour == constants::ROUTINE_TRAVEL_START_TIME || hour == constants::ROUTINE_TRAVEL_END_TIME { + self.producer.send_commuters(commuters); + } + } + + async fn send_migrators(&self, simulation_hour: Hour, outgoing: Vec) { + if simulation_hour % 24 == 0 { + self.producer.send_migrators(outgoing); + } + } + + async fn receive_commuters(&mut self, simulation_hour: Hour, commute_plan: &CommutePlan) -> Vec { + let mut incoming: Vec = Vec::new(); + if simulation_hour == constants::ROUTINE_TRAVEL_START_TIME || simulation_hour == constants::ROUTINE_TRAVEL_END_TIME { + let expected_incoming_regions = commute_plan.incoming_regions_count(&self.engine_id); + let mut received_incoming_regions = 0; + debug!("Receiving commuters from {} regions", expected_incoming_regions); + while expected_incoming_regions != received_incoming_regions { + let maybe_msg = + CommutersByRegion::receive_commuters_from_region(&mut self.commuter_stream, &self.engine_id).await; + if let Some(region_incoming) = maybe_msg { + trace_commuters(®ion_incoming, simulation_hour); + incoming.extend(region_incoming.get_commuters()); + received_incoming_regions += 1; + } + } + } + incoming + } + + async fn receive_migrators(&mut self, simulation_hour: Hour, migration_plan: &EngineMigrationPlan) -> Vec { + if simulation_hour % 24 == 0 { + let expected_incoming_regions = migration_plan.incoming_regions_count(); + let mut received_incoming_regions = 0; + debug!("Receiving migrators from {} regions", expected_incoming_regions); + let mut incoming: Vec = Vec::new(); + while expected_incoming_regions != received_incoming_regions { + let maybe_msg = travel_consumer::read_migrators(self.migration_stream.next().await); + if let Some(region_incoming) = maybe_msg { + incoming.extend(region_incoming.get_migrators()); + received_incoming_regions += 1; + } + } + incoming + } else { + Vec::new() + } + } +} diff --git a/engine/src/transport/mod.rs b/engine/src/transport/mod.rs new file mode 100644 index 00000000..6c52c191 --- /dev/null +++ b/engine/src/transport/mod.rs @@ -0,0 +1,42 @@ +/* + * EpiRust + * Copyright (c) 2023 ThoughtWorks, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use async_trait::async_trait; + +use common::models::custom_types::Hour; +use common::models::CommutePlan; + +use crate::models::events::Tick; +use crate::travel::commute::{Commuter, CommutersByRegion}; +use crate::travel::migration::{EngineMigrationPlan, Migrator, MigratorsByRegion}; + +pub mod engine_handlers; +pub mod kafka_transport; +mod mpi_tag; +pub mod mpi_transport; + +#[async_trait] +pub trait Transport { + async fn receive_tick(&mut self, simulation_hour: Hour, is_commute_enabled: bool, is_migration_enabled: bool) + -> Option; + async fn send_commuters(&self, simulation_hour: Hour, commuters: Vec); + async fn send_migrators(&self, simulation_hour: Hour, outgoing: Vec); + async fn receive_commuters(&mut self, simulation_hour: Hour, commute_plan: &CommutePlan) -> Vec; + async fn receive_migrators(&mut self, simulation_hour: Hour, migration_plan: &EngineMigrationPlan) -> Vec; +} diff --git a/orchestrator/src/utils/mod.rs b/engine/src/transport/mpi_tag.rs similarity index 51% rename from orchestrator/src/utils/mod.rs rename to engine/src/transport/mpi_tag.rs index faa3491c..a062e94e 100644 --- a/orchestrator/src/utils/mod.rs +++ b/engine/src/transport/mpi_tag.rs @@ -1,6 +1,6 @@ /* * EpiRust - * Copyright (c) 2020 ThoughtWorks, Inc. + * Copyright (c) 2023 ThoughtWorks, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by @@ -17,20 +17,18 @@ * */ -use serde_json::Value; -use std::fs::File; +use mpi::Tag; -pub fn read_simulation_conf(filename: &str) -> String { - let reader = File::open(filename).unwrap(); - let config: Value = serde_json::from_reader(reader).unwrap(); - let sim = config.as_object().unwrap(); - serde_json::to_string(sim).unwrap() +pub enum MpiTag { + CommuterTag, + MigratorTag, } -pub fn get_hours(filename: &str) -> i64 { - let reader = File::open(filename).unwrap(); - let config: Value = serde_json::from_reader(reader).unwrap(); - let sim = config.get("engine_configs").unwrap().as_array().unwrap(); - let hours = sim[0].get("config").unwrap().get("hours"); - hours.unwrap().as_i64().unwrap() +impl Into for MpiTag { + fn into(self) -> Tag { + match self { + MpiTag::CommuterTag => 12, + MpiTag::MigratorTag => 15, + } + } } diff --git a/engine/src/transport/mpi_transport.rs b/engine/src/transport/mpi_transport.rs new file mode 100644 index 00000000..ef7ca20b --- /dev/null +++ b/engine/src/transport/mpi_transport.rs @@ -0,0 +1,217 @@ +/* + * EpiRust + * Copyright (c) 2023 ThoughtWorks, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::HashMap; + +use async_trait::async_trait; +use mpi::point_to_point::{Destination, Source}; +use mpi::request::RequestCollection; +use mpi::topology::{Communicator, SimpleCommunicator}; +use mpi::Rank; +use snap::raw::{Decoder, Encoder}; + +use common::models::custom_types::Hour; +use common::models::CommutePlan; + +use crate::models::constants; +use crate::models::events::Tick; +use crate::transport::mpi_tag::MpiTag; +use crate::transport::Transport; +use crate::travel::commute::{Commuter, CommutersByRegion}; +use crate::travel::migration::{EngineMigrationPlan, Migrator, MigratorsByRegion}; + +pub struct MpiTransport { + engine_id: String, + world: SimpleCommunicator, + engine_ranks: HashMap, +} + +impl MpiTransport { + pub fn new(engine_id: String, regions: &[String]) -> Self { + let mut engine_ranks: HashMap = HashMap::new(); + for (i, engine) in regions.iter().enumerate() { + engine_ranks.insert(engine.clone(), Rank::from(i as u8)); + } + MpiTransport { engine_id, world: SimpleCommunicator::world(), engine_ranks } + } +} + +unsafe impl Sync for MpiTransport {} + +unsafe impl Send for MpiTransport {} + +#[async_trait] +impl Transport for MpiTransport { + async fn receive_tick( + &mut self, + simulation_hour: Hour, + is_commute_enabled: bool, + is_migration_enabled: bool, + ) -> Option { + let day_hour = simulation_hour % 24; + let is_commute_hour = day_hour == constants::ROUTINE_TRAVEL_END_TIME || day_hour == constants::ROUTINE_TRAVEL_START_TIME; + let is_migration_hour = day_hour == 0; + let receive_tick_for_commute: bool = is_commute_enabled && is_commute_hour; + let receive_tick_for_migration: bool = is_migration_enabled && is_migration_hour; + if receive_tick_for_commute || receive_tick_for_migration { + Some(Tick::new(simulation_hour, false)) + } else { + None + } + } + + async fn send_commuters(&self, hour: Hour, commuters: Vec) { + let h = hour % 24; + if h == constants::ROUTINE_TRAVEL_START_TIME || h == constants::ROUTINE_TRAVEL_END_TIME { + let total_count = self.engine_ranks.iter().len(); + let self_rank = self.world.rank(); + assert_eq!(commuters.len(), total_count); + + let serialized_commuters: Vec<(&Rank, Vec)> = commuters + .iter() + .map(|s| { + let rank: &Rank = self.engine_ranks.iter().find(|(x, _)| *x == s.to_engine_id()).unwrap().1; + let serialized: Vec = bincode::serialize(&s).unwrap(); + let compressed: Vec = Encoder::new().compress_vec(&serialized[..]).unwrap(); + let length_of_buffer = compressed.len(); + let mut compressed_data_with_length = bincode::serialize(&length_of_buffer).unwrap(); + compressed_data_with_length.extend(compressed); + debug!("Rank {self_rank}: commute to {} this much {}", rank, length_of_buffer); + (rank, compressed_data_with_length) + }) + .collect(); + + for (&rank, data) in serialized_commuters.iter() { + let p = self.world.process_at_rank(rank); + p.buffered_send_with_tag(&data[..], MpiTag::CommuterTag.into()); + } + } + } + + async fn send_migrators(&self, hour: Hour, outgoing: Vec) { + debug!("Hi there"); + if hour % 24 == 0 { + let total_count = self.engine_ranks.iter().len() - 1; + let self_rank = self.world.rank(); + assert_eq!(outgoing.len(), total_count); + + let serialized_migrators: Vec<(&Rank, Vec)> = outgoing + .iter() + .map(|s| { + let rank: &Rank = self.engine_ranks.iter().find(|(x, _)| *x == s.to_engine_id()).unwrap().1; + let serialized: Vec = bincode::serialize(&s).unwrap(); + let compressed: Vec = Encoder::new().compress_vec(&serialized[..]).unwrap(); + let length_of_buffer = compressed.len(); + let mut compressed_data_with_length = bincode::serialize(&length_of_buffer).unwrap(); + compressed_data_with_length.extend(compressed); + debug!("Rank {self_rank}: migrate to {} this much {}", rank, length_of_buffer); + (rank, compressed_data_with_length) + }) + .collect(); + + for (&rank, data) in serialized_migrators.iter() { + let p = self.world.process_at_rank(rank); + p.buffered_send_with_tag(&data[..], MpiTag::MigratorTag.into()); + } + } + } + + async fn receive_commuters(&mut self, simulation_hour: Hour, _commute_plan: &CommutePlan) -> Vec { + let h = simulation_hour % 24; + let mut incoming: Vec = Vec::new(); + + if h == constants::ROUTINE_TRAVEL_START_TIME || h == constants::ROUTINE_TRAVEL_END_TIME { + let total_count = self.engine_ranks.iter().len(); + let self_rank = self.world.rank(); + + let buffer = vec![0u8; 1200000]; + let mut result = vec![buffer; total_count]; + + mpi::request::multiple_scope(total_count, |scope, coll: &mut RequestCollection<[u8]>| { + for (index, value) in result.iter_mut().enumerate() { + let rank = Rank::from(index as i32); + let p = self.world.process_at_rank(rank); + let rreq = p.immediate_receive_into_with_tag(scope, &mut value[..], MpiTag::CommuterTag.into()); + coll.add(rreq); + } + let mut recv_count = 0; + while coll.incomplete() > 0 { + let (_u, s, r) = coll.wait_any().unwrap(); + let length_of_msg: usize = bincode::deserialize::(&r[0..7]).unwrap() as usize; + let decompressed = Decoder::new().decompress_vec(&r[8..length_of_msg + 8]).unwrap(); + let received: CommutersByRegion = bincode::deserialize(&decompressed[..]).unwrap(); + trace!( + "engine rank: {}, hour: {}, from_rank: {}, received_commuters - {:?}", + self_rank, + simulation_hour, + s.source_rank(), + received + ); + let vec1 = received.get_commuters(); + info!("current rank : {}, source: {}, commuters received: {}", self_rank, s.source_rank(), vec1.len()); + incoming.extend(vec1); + recv_count += 1; + } + assert_eq!(recv_count, total_count); + }); + } + incoming + } + + async fn receive_migrators(&mut self, hour: Hour, _migration_plan: &EngineMigrationPlan) -> Vec { + let mut incoming: Vec = Vec::new(); + + if hour % 24 == 0 { + let total_count = self.engine_ranks.iter().len(); + let self_rank = self.world.rank(); + + let buffer = vec![0u8; 1200000]; + let mut result = vec![buffer; total_count]; + + mpi::request::multiple_scope(total_count - 1, |scope, coll: &mut RequestCollection<[u8]>| { + for (index, value) in result.iter_mut().enumerate().filter(|r| r.0 != (self_rank as usize)) { + let rank = Rank::from(index as i32); + let p = self.world.process_at_rank(rank); + let rreq = p.immediate_receive_into_with_tag(scope, &mut value[..], MpiTag::MigratorTag.into()); + coll.add(rreq); + } + let mut recv_count = 0; + while coll.incomplete() > 0 { + let (_u, s, r) = coll.wait_any().unwrap(); + let length_of_msg: usize = bincode::deserialize::(&r[0..7]).unwrap() as usize; + let decompressed = Decoder::new().decompress_vec(&r[8..length_of_msg + 8]).unwrap(); + let received: MigratorsByRegion = bincode::deserialize(&decompressed[..]).unwrap(); + trace!( + "engine rank: {}, hour: {}, from_rank: {}, received_commuters - {:?}", + self_rank, + hour, + s.source_rank(), + received + ); + let vec1 = received.get_migrators(); + info!("current rank : {}, source: {}, migrators received: {}", self_rank, s.source_rank(), vec1.len()); + incoming.extend(vec1); + recv_count += 1; + } + assert_eq!(recv_count, total_count - 1); + }); + } + incoming + } +} diff --git a/engine/src/travel/commute/mod.rs b/engine/src/travel/commute/mod.rs index ff6a808c..836a97ec 100644 --- a/engine/src/travel/commute/mod.rs +++ b/engine/src/travel/commute/mod.rs @@ -58,7 +58,7 @@ pub(crate) async fn receive_commuters( } } -fn trace_commuters(commuters_by_region: &CommutersByRegion, hour: Hour) { +pub fn trace_commuters(commuters_by_region: &CommutersByRegion, hour: Hour) { if hour == constants::ROUTINE_TRAVEL_START_TIME { trace!( "Travel_start: Received {} commuters from {:?} region", diff --git a/engine/src/travel/migration/migrators_by_engine.rs b/engine/src/travel/migration/migrators_by_engine.rs index 9d0652de..b0bb2255 100644 --- a/engine/src/travel/migration/migrators_by_engine.rs +++ b/engine/src/travel/migration/migrators_by_engine.rs @@ -22,7 +22,7 @@ use common::models::MigrationPlan; use crate::travel::migration::Migrator; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct MigratorsByRegion { to_engine_id: String, pub migrators: Vec, diff --git a/engine/src/utils/util.rs b/engine/src/utils/util.rs index 557e4d7d..e6b3cdbe 100644 --- a/engine/src/utils/util.rs +++ b/engine/src/utils/util.rs @@ -23,13 +23,11 @@ use common::config::{Config, StartingInfections}; use common::models::custom_types::Count; use time::OffsetDateTime; -pub fn output_file_format(config: &Config, run_mode: &RunMode) -> String { +pub fn output_file_format(config: &Config, engine_id: &str) -> String { let format = time::format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second]").unwrap(); let now = OffsetDateTime::now_utc(); let mut output_file_prefix = config.get_output_file().unwrap_or_else(|| "simulation".to_string()); - if let RunMode::MultiEngine { engine_id } = run_mode { - output_file_prefix = format!("{}_{}", output_file_prefix, engine_id); - } + output_file_prefix = format!("{}_{}", output_file_prefix, engine_id); format!("{}_{}", output_file_prefix, now.format(&format).unwrap()) } diff --git a/orchestrator/src/main.rs b/orchestrator/src/main.rs index 1035a95a..d452c9d8 100644 --- a/orchestrator/src/main.rs +++ b/orchestrator/src/main.rs @@ -27,21 +27,19 @@ use std::ops::Range; use std::string::String; use clap::Parser; -use common::config::TravelPlanConfig; +use common::config::{Configuration, TravelPlanConfig}; +use common::utils; +use common::utils::get_hours; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::client::DefaultClientContext; use rdkafka::ClientConfig; -use crate::config::Configuration; use crate::kafka_producer::KafkaProducer; -use crate::utils::get_hours; -mod config; mod environment; mod kafka_consumer; mod kafka_producer; mod ticks; -mod utils; #[derive(Parser)] #[command(author, version, about)] @@ -56,7 +54,7 @@ async fn main() { let args = Args::parse(); - let default_config_path = "config/simulation.json".to_string(); + let default_config_path = "orchestrator/config/simulation.json".to_string(); let config_path = args.config.unwrap_or(default_config_path); let config = Configuration::read(&config_path).expect("Error while reading config");