From 6ac167b287bc7e2eb0885063753a9826153cf541 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Tue, 8 Jun 2021 10:55:20 -0500 Subject: [PATCH 1/2] Optional regions --- timely/src/dataflow/operators/enterleave.rs | 3 +- timely/src/dataflow/scopes/child.rs | 50 ++++ timely/src/dataflow/scopes/mod.rs | 72 +++++ timely/src/dataflow/scopes/region.rs | 311 ++++++++++++++++++++ timely/src/progress/subgraph.rs | 5 + 5 files changed, 440 insertions(+), 1 deletion(-) create mode 100644 timely/src/dataflow/scopes/region.rs diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index 6153d7f505..28f3474dca 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -21,6 +21,7 @@ use std::marker::PhantomData; +use crate::dataflow::scopes::Region; use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; @@ -51,7 +52,7 @@ pub trait Enter, D: Data> { /// }); /// }); /// ``` - fn enter<'a>(&self, _: &Child<'a, G, T>) -> Stream, D>; + fn enter<'a>(&self, child: &Child<'a, G, T>) -> Stream, D>; } use crate::dataflow::scopes::child::Iterative; diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 07789304c9..5dabdeaa31 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -5,6 +5,7 @@ use std::cell::RefCell; use crate::communication::{Data, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; +use crate::dataflow::scopes::region::RegionSubgraph; use crate::scheduling::Scheduler; use crate::scheduling::activate::Activations; use crate::progress::{Timestamp, Operate, SubgraphBuilder}; @@ -15,6 +16,7 @@ use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::worker::{AsWorker, Config}; +use super::Region; use super::{ScopeParent, Scope}; /// Type alias for iterative child scope. @@ -134,6 +136,54 @@ where result } + + fn optional_region_named(&mut self, name: &str, enabled: bool, func: F) -> R + where + F: FnOnce(&mut Region) -> R, + { + // If the region is enabled then build the child dataflow graph, otherwise + // create a passthrough region + let region = if enabled { + let index = self.subgraph.borrow_mut().allocate_child_id(); + let path = self.subgraph.borrow().path.clone(); + + let subscope = RefCell::new(SubgraphBuilder::::new_from( + index, + path, + self.logging(), + self.progress_logging.clone(), + name, + )); + + Some((subscope, index)) + } else { + None + }; + + let result = { + let region = region + .as_ref() + .map_or(RegionSubgraph::Passthrough, |(scope, idx)| { + RegionSubgraph::subgraph(scope, *idx) + }); + + let mut builder = Region { + subgraph: region, + parent: self.clone(), + logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), + }; + func(&mut builder) + }; + + // If the region is enabled then build and add the sub-scope to the dataflow graph + if let Some((subscope, index)) = region { + let subscope = subscope.into_inner().build(self); + self.add_operator_with_index(Box::new(subscope), index); + } + + result + } } use crate::communication::Message; diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 50ee7b8fa0..3e9cbc10ea 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -1,5 +1,6 @@ //! Hierarchical organization of timely dataflow graphs. +use std::panic::Location; use crate::progress::{Timestamp, Operate, Source, Target}; use crate::order::Product; use crate::progress::timestamp::Refines; @@ -7,8 +8,10 @@ use crate::communication::Allocate; use crate::worker::AsWorker; pub mod child; +pub mod region; pub use self::child::Child; +pub use region::Region; /// The information a child scope needs from its parent. pub trait ScopeParent: AsWorker+Clone { @@ -187,4 +190,73 @@ pub trait Scope: ScopeParent { self.scoped::<::Timestamp,R,F>(name, func) } + /// Creates an optional dataflow region with the same timestamp as the outer scope. + /// + /// If `enabled` is true then this acts like [`Scope::region()`] and if `enabled` + /// is false it's a no-op and produces no change in the dataflow graph. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// + /// timely::execute_from_args(std::env::args(), |worker| { + /// // must specify types as nothing else drives inference. + /// let input = worker.dataflow::(|child1| { + /// let (input, stream) = child1.new_input::(); + /// let output = child1.optional_region(true, |child2| { + /// stream.enter(child2).leave() + /// }); + /// + /// input + /// }); + /// }); + /// ``` + /// + #[track_caller] + fn optional_region(&mut self, enabled: bool, func: F) -> R + where + F: FnOnce(&mut Region) -> R, + { + let caller = Location::caller(); + let name = format!( + "OptionalRegion @ {}:{}:{}", + caller.file(), + caller.line(), + caller.column(), + ); + + self.optional_region_named(&name, enabled, func) + } + + /// Creates an optional dataflow region with the same timestamp as the outer scope. + /// + /// If `enabled` is true then this acts like [`Scope::region()`] and if `enabled` + /// is false it's a no-op and produces no change in the dataflow graph. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// + /// timely::execute_from_args(std::env::args(), |worker| { + /// // must specify types as nothing else drives inference. + /// let input = worker.dataflow::(|child1| { + /// let (input, stream) = child1.new_input::(); + /// let output = child1.optional_region_named( + /// "This is my region", + /// false, + /// |child2| stream.enter(child2).leave(), + /// ); + /// + /// input + /// }); + /// }); + /// ``` + /// + fn optional_region_named(&mut self, name: &str, enabled: bool, func: F) -> R + where + F: FnOnce(&mut Region) -> R; } diff --git a/timely/src/dataflow/scopes/region.rs b/timely/src/dataflow/scopes/region.rs new file mode 100644 index 0000000000..54a62bc358 --- /dev/null +++ b/timely/src/dataflow/scopes/region.rs @@ -0,0 +1,311 @@ +//! A child dataflow scope used to build optional dataflow regions. + +use crate::{ + communication::{ + allocator::thread::{ThreadPuller, ThreadPusher}, + Data, Pull, Push, + }, + dataflow::{scopes::Child, Scope, ScopeParent}, + logging::{TimelyLogger as Logger, TimelyProgressLogger as ProgressLogger, WorkerIdentifier}, + logging_core::Registry, + progress::{timestamp::Refines, Operate, Source, SubgraphBuilder, Target, Timestamp}, + scheduling::{Activations, Scheduler}, + worker::{AsWorker, Config}, +}; +use std::{ + cell::{RefCell, RefMut}, + rc::Rc, +}; +use timely_communication::Message; + +type AllocatedChannels = (Vec>>>, Box>>); + +/// A nested dataflow region, can either be a subgraph equivalent to [`Scope::region()`] +/// or a no-op that has no affect on the dataflow graph +pub struct Region<'a, G, T> +where + G: Scope, + T: Timestamp, +{ + /// The subgraph under construction + pub subgraph: RegionSubgraph<'a, G, T>, + /// A copy of the child's parent scope. + pub parent: G, + /// The log writer for this scope. + pub logging: Option, + /// The progress log writer for this scope. + pub progress_logging: Option, +} + +impl<'a, G, T> Region<'a, G, T> +where + G: Scope, + T: Timestamp, +{ + fn allocate_child_id(&mut self) -> usize { + match self.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow_mut().allocate_child_id(), + RegionSubgraph::Passthrough => self.parent.allocate_operator_index(), + } + } + + fn path(&self) -> Vec { + match self.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow().path.clone(), + RegionSubgraph::Passthrough => self.parent.addr(), + } + } + + fn create_child_subscope( + &mut self, + name: &str, + ) -> RefCell> + where + TOuter: Timestamp, + TInner: Timestamp + Refines, + { + let index = self.allocate_child_id(); + let path = self.path(); + + RefCell::new(SubgraphBuilder::new_from( + index, + path, + self.logging(), + self.progress_logging.clone(), + name, + )) + } +} + +impl<'a, G, T> AsWorker for Region<'a, G, T> +where + G: Scope, + T: Timestamp, +{ + fn config(&self) -> &Config { + self.parent.config() + } + + fn index(&self) -> usize { + self.parent.index() + } + + fn peers(&self) -> usize { + self.parent.peers() + } + + fn allocate(&mut self, identifier: usize, address: &[usize]) -> AllocatedChannels { + self.parent.allocate(identifier, address) + } + + fn pipeline( + &mut self, + identifier: usize, + address: &[usize], + ) -> (ThreadPusher>, ThreadPuller>) { + self.parent.pipeline(identifier, address) + } + + fn new_identifier(&mut self) -> usize { + self.parent.new_identifier() + } + + fn log_register(&self) -> RefMut> { + self.parent.log_register() + } +} + +impl<'a, G, T> Scheduler for Region<'a, G, T> +where + G: Scope, + T: Timestamp, +{ + fn activations(&self) -> Rc> { + self.parent.activations() + } +} + +impl<'a, G, T> ScopeParent for Region<'a, G, T> +where + G: Scope, + T: Timestamp, +{ + type Timestamp = T; +} + +impl<'a, G, T> Scope for Region<'a, G, T> +where + G: Scope, + T: Timestamp + Refines, +{ + fn name(&self) -> String { + todo!() + } + + fn addr(&self) -> Vec { + todo!() + } + + fn add_edge(&self, source: Source, target: Target) { + match self.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => { + subgraph.borrow_mut().connect(source, target); + } + RegionSubgraph::Passthrough => self.parent.add_edge(source, target), + } + } + + fn allocate_operator_index(&mut self) -> usize { + match self.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow_mut().allocate_child_id(), + RegionSubgraph::Passthrough => self.parent.allocate_operator_index(), + } + } + + fn add_operator_with_indices( + &mut self, + operator: Box>, + local: usize, + global: usize, + ) { + match self.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => { + subgraph.borrow_mut().add_child(operator, local, global); + } + RegionSubgraph::Passthrough => self + .parent + .add_operator_with_indices(operator, local, global), + } + } + + fn scoped(&mut self, name: &str, func: F) -> R + where + T2: Timestamp + Refines, + F: FnOnce(&mut Child) -> R, + { + let subscope = self.create_child_subscope(name); + let index = subscope.borrow().index(); + + let result = { + let mut builder = Child { + subgraph: &subscope, + parent: self.clone(), + logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), + }; + func(&mut builder) + }; + + let subscope = subscope.into_inner().build(self); + self.add_operator_with_index(Box::new(subscope), index); + + result + } + + fn optional_region_named(&mut self, name: &str, enabled: bool, func: F) -> R + where + F: FnOnce(&mut Region) -> R, + { + // If the region is enabled then build the child dataflow graph, otherwise + // create a passthrough region + let region = if enabled { + Some(self.create_child_subscope(name)) + } else { + None + }; + + let result = { + let region = region + .as_ref() + .map_or(RegionSubgraph::Passthrough, |subscope| { + let index = subscope.borrow().index(); + RegionSubgraph::subgraph(subscope, index) + }); + + let mut builder = Region { + subgraph: region, + parent: self.clone(), + logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), + }; + func(&mut builder) + }; + + // If the region is enabled then build and add the sub-scope to the dataflow graph + if let Some(subscope) = region { + let index = subscope.borrow().index(); + let subscope = subscope.into_inner().build(self); + + self.add_operator_with_index(Box::new(subscope), index); + } + + result + } +} + +impl<'a, G, T> Clone for Region<'a, G, T> +where + G: Scope, + T: Timestamp, +{ + fn clone(&self) -> Self { + Self { + subgraph: self.subgraph, + parent: self.parent.clone(), + logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), + } + } +} + +/// The kind of region to build, can either be a subgraph equivalent to [`Scope::region()`] +/// or a no-op that has no affect on the dataflow graph +pub enum RegionSubgraph<'a, G, T> +where + G: ScopeParent, + T: Timestamp, +{ + /// A region that will be rendered as a nested dataflow scope + Subgraph { + /// The inner dataflow scope + subgraph: &'a RefCell>, + /// The subgraph's operator index + index: usize, + }, + /// A region that will be a no-op in the dataflow graph + Passthrough, +} + +impl<'a, G, T> RegionSubgraph<'a, G, T> +where + G: ScopeParent, + T: Timestamp, +{ + /// Create a new region subgraph + pub(crate) fn subgraph( + subgraph: &'a RefCell>, + index: usize, + ) -> Self { + Self::Subgraph { subgraph, index } + } +} + +impl<'a, G, T> Clone for RegionSubgraph<'a, G, T> +where + G: ScopeParent, + T: Timestamp, +{ + fn clone(&self) -> Self { + match *self { + Self::Subgraph { subgraph, index } => Self::Subgraph { subgraph, index }, + Self::Passthrough => Self::Passthrough, + } + } +} + +impl<'a, G, T> Copy for RegionSubgraph<'a, G, T> +where + G: ScopeParent, + T: Timestamp, +{ +} diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 309d9077cf..2c45797dc2 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -215,6 +215,11 @@ where progress_mode: worker.config().progress_mode, } } + + /// Get the subgraph builder's index. + pub fn index(&self) -> usize { + self.index + } } From a2faa7f4bfd98ba42d67b4c05f0cb528d6802f22 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Wed, 9 Jun 2021 09:47:44 -0500 Subject: [PATCH 2/2] Added ingress/egress traits for Regions --- timely/src/dataflow/operators/enterleave.rs | 107 ++++++++++++++++++- timely/src/dataflow/scopes/child.rs | 2 +- timely/src/dataflow/scopes/mod.rs | 24 ++--- timely/src/dataflow/scopes/region.rs | 111 ++++++++++---------- timely/src/dataflow/stream.rs | 2 +- 5 files changed, 170 insertions(+), 76 deletions(-) diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index 28f3474dca..2e920ab0f1 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -21,7 +21,7 @@ use std::marker::PhantomData; -use crate::dataflow::scopes::Region; +use crate::dataflow::scopes::region::{Region, RegionSubgraph}; use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; @@ -31,6 +31,7 @@ use crate::communication::Push; use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::{Bundle, Message}; +use crate::scheduling::Scheduler; use crate::worker::AsWorker; use crate::dataflow::{Stream, Scope}; use crate::dataflow::scopes::{Child, ScopeParent}; @@ -85,9 +86,6 @@ impl::Ti impl, D: Data> Enter for Stream { fn enter<'a>(&self, scope: &Child<'a, G, T>) -> Stream, D> { - - use crate::scheduling::Scheduler; - let (targets, registrar) = Tee::::new(); let ingress = IngressNub { targets: Counter::new(targets), @@ -198,6 +196,107 @@ where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { } } +/// Extension trait to move a [`Stream`] into a child [`Region`] +pub trait EnterRegion +where + G: Scope, + D: Data, +{ + /// Moves the [`Stream`] argument into a child [`Region`] + /// + /// # Examples + /// ``` + /// use timely::dataflow::scopes::Scope; + /// use timely::dataflow::operators::{enterleave::{EnterRegion, LeaveRegion}, ToStream}; + /// + /// timely::example(|outer| { + /// let stream = (0..9).to_stream(outer); + /// let output = outer.optional_region(true, |inner| { + /// stream.enter_region(inner).leave_region() + /// }); + /// }); + /// ``` + fn enter_region<'a>(&self, region: &Region<'a, G>) -> Stream, D>; +} + +impl EnterRegion for Stream +where + G: Scope, + D: Data, +{ + fn enter_region<'a>(&self, region: &Region<'a, G>) -> Stream, D> { + match region.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => { + let (targets, registrar) = Tee::::new(); + let ingress = IngressNub { + targets: Counter::new(targets), + phantom: PhantomData, + activator: region.activator_for(®ion.addr()), + active: false, + }; + let produced = ingress.targets.produced().clone(); + + let input = subgraph.borrow_mut().new_input(produced); + + let channel_id = region.clone().new_identifier(); + self.connect_to(input, ingress, channel_id); + + Stream::new(Source::new(0, input.port), registrar, region.clone()) + } + + RegionSubgraph::Passthrough => Stream::new(*self.name(), self.ports.clone(), region.clone()), + } + } +} + +/// Extension trait to move a [`Stream`] to the parent of its current [`Region`] +pub trait LeaveRegion { + /// Moves a [`Stream`] to the parent of its current [`Region`] + /// + /// # Examples + /// ``` + /// use timely::dataflow::scopes::Scope; + /// use timely::dataflow::operators::{enterleave::{EnterRegion, LeaveRegion}, ToStream}; + /// + /// timely::example(|outer| { + /// let stream = (0..9).to_stream(outer); + /// let output = outer.optional_region(false, |inner| { + /// stream.enter_region(inner).leave_region() + /// }); + /// }); + /// ``` + fn leave_region(&self) -> Stream; +} + +impl<'a, G: Scope, D: Data> LeaveRegion for Stream, D> { + fn leave_region(&self) -> Stream { + let scope = self.scope(); + + match scope.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => { + let output = subgraph.borrow_mut().new_output(); + let (targets, registrar) = Tee::::new(); + let channel_id = scope.clone().new_identifier(); + + self.connect_to( + Target::new(0, output.port), + EgressNub { + targets, + phantom: PhantomData, + }, + channel_id, + ); + + Stream::new(output, registrar, scope.parent) + } + + RegionSubgraph::Passthrough => { + Stream::new(*self.name(), self.ports.clone(), scope.parent) + } + } + } +} + #[cfg(test)] mod test { /// Test that nested scopes with pass-through edges (no operators) correctly communicate progress. diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 5dabdeaa31..23447bf14b 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -139,7 +139,7 @@ where fn optional_region_named(&mut self, name: &str, enabled: bool, func: F) -> R where - F: FnOnce(&mut Region) -> R, + F: FnOnce(&mut Region) -> R, { // If the region is enabled then build the child dataflow graph, otherwise // create a passthrough region diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 3e9cbc10ea..d9dcbd0fbb 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -199,14 +199,14 @@ pub trait Scope: ScopeParent { /// /// ``` /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// use timely::dataflow::operators::{Input, enterleave::{EnterRegion, LeaveRegion}}; /// /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. - /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); - /// let output = child1.optional_region(true, |child2| { - /// stream.enter(child2).leave() + /// let input = worker.dataflow::(|scope| { + /// let (input, stream) = scope.new_input::(); + /// let output = scope.optional_region(true, |region| { + /// stream.enter_region(region).leave_region() /// }); /// /// input @@ -217,7 +217,7 @@ pub trait Scope: ScopeParent { #[track_caller] fn optional_region(&mut self, enabled: bool, func: F) -> R where - F: FnOnce(&mut Region) -> R, + F: FnOnce(&mut Region) -> R, { let caller = Location::caller(); let name = format!( @@ -239,16 +239,16 @@ pub trait Scope: ScopeParent { /// /// ``` /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// use timely::dataflow::operators::{Input, enterleave::{EnterRegion, LeaveRegion}}; /// /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. - /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); - /// let output = child1.optional_region_named( + /// let input = worker.dataflow::(|scope| { + /// let (input, stream) = scope.new_input::(); + /// let output = scope.optional_region_named( /// "This is my region", /// false, - /// |child2| stream.enter(child2).leave(), + /// |region| stream.enter_region(region).leave_region(), /// ); /// /// input @@ -258,5 +258,5 @@ pub trait Scope: ScopeParent { /// fn optional_region_named(&mut self, name: &str, enabled: bool, func: F) -> R where - F: FnOnce(&mut Region) -> R; + F: FnOnce(&mut Region) -> R; } diff --git a/timely/src/dataflow/scopes/region.rs b/timely/src/dataflow/scopes/region.rs index 54a62bc358..547a7be705 100644 --- a/timely/src/dataflow/scopes/region.rs +++ b/timely/src/dataflow/scopes/region.rs @@ -22,13 +22,12 @@ type AllocatedChannels = (Vec>>>, Box +pub struct Region<'a, G> where - G: Scope, - T: Timestamp, + G: Scope, { /// The subgraph under construction - pub subgraph: RegionSubgraph<'a, G, T>, + pub subgraph: RegionSubgraph<'a, G>, /// A copy of the child's parent scope. pub parent: G, /// The log writer for this scope. @@ -37,22 +36,28 @@ where pub progress_logging: Option, } -impl<'a, G, T> Region<'a, G, T> +impl<'a, G> Region<'a, G> where - G: Scope, - T: Timestamp, + G: Scope, { - fn allocate_child_id(&mut self) -> usize { - match self.subgraph { - RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow_mut().allocate_child_id(), - RegionSubgraph::Passthrough => self.parent.allocate_operator_index(), + pub(crate) fn new( + subgraph: RegionSubgraph<'a, G>, + parent: G, + logging: Option, + progress_logging: Option, + ) -> Self { + Self { + subgraph, + parent, + logging, + progress_logging, } } - fn path(&self) -> Vec { + fn allocate_child_id(&mut self) -> usize { match self.subgraph { - RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow().path.clone(), - RegionSubgraph::Passthrough => self.parent.addr(), + RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow_mut().allocate_child_id(), + RegionSubgraph::Passthrough => self.parent.allocate_operator_index(), } } @@ -65,7 +70,7 @@ where TInner: Timestamp + Refines, { let index = self.allocate_child_id(); - let path = self.path(); + let path = self.addr(); RefCell::new(SubgraphBuilder::new_from( index, @@ -77,10 +82,9 @@ where } } -impl<'a, G, T> AsWorker for Region<'a, G, T> +impl<'a, G> AsWorker for Region<'a, G> where - G: Scope, - T: Timestamp, + G: Scope, { fn config(&self) -> &Config { self.parent.config() @@ -115,35 +119,38 @@ where } } -impl<'a, G, T> Scheduler for Region<'a, G, T> +impl<'a, G> Scheduler for Region<'a, G> where - G: Scope, - T: Timestamp, + G: Scope, { fn activations(&self) -> Rc> { self.parent.activations() } } -impl<'a, G, T> ScopeParent for Region<'a, G, T> +impl<'a, G> ScopeParent for Region<'a, G> where - G: Scope, - T: Timestamp, + G: Scope, { - type Timestamp = T; + type Timestamp = G::Timestamp; } -impl<'a, G, T> Scope for Region<'a, G, T> +impl<'a, G> Scope for Region<'a, G> where - G: Scope, - T: Timestamp + Refines, + G: Scope, { fn name(&self) -> String { - todo!() + match self.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow_mut().name.clone(), + RegionSubgraph::Passthrough => self.parent.name(), + } } fn addr(&self) -> Vec { - todo!() + match self.subgraph { + RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow_mut().path.clone(), + RegionSubgraph::Passthrough => self.parent.addr(), + } } fn add_edge(&self, source: Source, target: Target) { @@ -156,10 +163,7 @@ where } fn allocate_operator_index(&mut self) -> usize { - match self.subgraph { - RegionSubgraph::Subgraph { subgraph, .. } => subgraph.borrow_mut().allocate_child_id(), - RegionSubgraph::Passthrough => self.parent.allocate_operator_index(), - } + self.allocate_child_id() } fn add_operator_with_indices( @@ -204,7 +208,7 @@ where fn optional_region_named(&mut self, name: &str, enabled: bool, func: F) -> R where - F: FnOnce(&mut Region) -> R, + F: FnOnce(&mut Region) -> R, { // If the region is enabled then build the child dataflow graph, otherwise // create a passthrough region @@ -222,12 +226,12 @@ where RegionSubgraph::subgraph(subscope, index) }); - let mut builder = Region { - subgraph: region, - parent: self.clone(), - logging: self.logging.clone(), - progress_logging: self.progress_logging.clone(), - }; + let mut builder = Region::new( + region, + self.clone(), + self.logging.clone(), + self.progress_logging.clone(), + ); func(&mut builder) }; @@ -243,10 +247,9 @@ where } } -impl<'a, G, T> Clone for Region<'a, G, T> +impl<'a, G> Clone for Region<'a, G> where - G: Scope, - T: Timestamp, + G: Scope, { fn clone(&self) -> Self { Self { @@ -260,15 +263,14 @@ where /// The kind of region to build, can either be a subgraph equivalent to [`Scope::region()`] /// or a no-op that has no affect on the dataflow graph -pub enum RegionSubgraph<'a, G, T> +pub enum RegionSubgraph<'a, G> where G: ScopeParent, - T: Timestamp, { /// A region that will be rendered as a nested dataflow scope Subgraph { /// The inner dataflow scope - subgraph: &'a RefCell>, + subgraph: &'a RefCell>, /// The subgraph's operator index index: usize, }, @@ -276,24 +278,22 @@ where Passthrough, } -impl<'a, G, T> RegionSubgraph<'a, G, T> +impl<'a, G> RegionSubgraph<'a, G> where G: ScopeParent, - T: Timestamp, { /// Create a new region subgraph pub(crate) fn subgraph( - subgraph: &'a RefCell>, + subgraph: &'a RefCell>, index: usize, ) -> Self { Self::Subgraph { subgraph, index } } } -impl<'a, G, T> Clone for RegionSubgraph<'a, G, T> +impl<'a, G> Clone for RegionSubgraph<'a, G> where G: ScopeParent, - T: Timestamp, { fn clone(&self) -> Self { match *self { @@ -303,9 +303,4 @@ where } } -impl<'a, G, T> Copy for RegionSubgraph<'a, G, T> -where - G: ScopeParent, - T: Timestamp, -{ -} +impl<'a, G> Copy for RegionSubgraph<'a, G> where G: ScopeParent {} diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 8a3bccfbd1..d9d2d3f745 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -25,7 +25,7 @@ pub struct Stream { /// The `Scope` containing the stream. scope: S, /// Maintains a list of Push> interested in the stream's output. - ports: TeeHelper, + pub(crate) ports: TeeHelper, } impl Stream {