Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conditional regions #393

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 104 additions & 4 deletions timely/src/dataflow/operators/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

use std::marker::PhantomData;

use crate::dataflow::scopes::region::{Region, RegionSubgraph};
use crate::progress::Timestamp;
use crate::progress::timestamp::Refines;
use crate::progress::{Source, Target};
Expand All @@ -30,6 +31,7 @@ use crate::communication::Push;
use crate::dataflow::channels::pushers::{Counter, Tee};
use crate::dataflow::channels::{Bundle, Message};

use crate::scheduling::Scheduler;
use crate::worker::AsWorker;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::scopes::{Child, ScopeParent};
Expand All @@ -51,7 +53,7 @@ pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, D: Data> {
/// });
/// });
/// ```
fn enter<'a>(&self, _: &Child<'a, G, T>) -> Stream<Child<'a, G, T>, D>;
fn enter<'a>(&self, child: &Child<'a, G, T>) -> Stream<Child<'a, G, T>, D>;
}

use crate::dataflow::scopes::child::Iterative;
Expand Down Expand Up @@ -84,9 +86,6 @@ impl<G: Scope, T: Timestamp, D: Data, E: Enter<G, Product<<G as ScopeParent>::Ti

impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, D: Data> Enter<G, T, D> for Stream<G, D> {
fn enter<'a>(&self, scope: &Child<'a, G, T>) -> Stream<Child<'a, G, T>, D> {

use crate::scheduling::Scheduler;

let (targets, registrar) = Tee::<T, D>::new();
let ingress = IngressNub {
targets: Counter::new(targets),
Expand Down Expand Up @@ -197,6 +196,107 @@ where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data {
}
}

/// Extension trait to move a [`Stream`] into a child [`Region`]
pub trait EnterRegion<G, D>
where
G: Scope,
D: Data,
{
/// Moves the [`Stream`] argument into a child [`Region`]
///
/// # Examples
/// ```
/// use timely::dataflow::scopes::Scope;
/// use timely::dataflow::operators::{enterleave::{EnterRegion, LeaveRegion}, ToStream};
///
/// timely::example(|outer| {
/// let stream = (0..9).to_stream(outer);
/// let output = outer.optional_region(true, |inner| {
/// stream.enter_region(inner).leave_region()
/// });
/// });
/// ```
fn enter_region<'a>(&self, region: &Region<'a, G>) -> Stream<Region<'a, G>, D>;
}

impl<G, D> EnterRegion<G, D> for Stream<G, D>
where
G: Scope,
D: Data,
{
fn enter_region<'a>(&self, region: &Region<'a, G>) -> Stream<Region<'a, G>, D> {
match region.subgraph {
RegionSubgraph::Subgraph { subgraph, .. } => {
let (targets, registrar) = Tee::<G::Timestamp, D>::new();
let ingress = IngressNub {
targets: Counter::new(targets),
phantom: PhantomData,
activator: region.activator_for(&region.addr()),
active: false,
};
let produced = ingress.targets.produced().clone();

let input = subgraph.borrow_mut().new_input(produced);

let channel_id = region.clone().new_identifier();
self.connect_to(input, ingress, channel_id);

Stream::new(Source::new(0, input.port), registrar, region.clone())
}

RegionSubgraph::Passthrough => Stream::new(*self.name(), self.ports.clone(), region.clone()),
}
}
}

/// Extension trait to move a [`Stream`] to the parent of its current [`Region`]
pub trait LeaveRegion<G: Scope, D: Data> {
/// Moves a [`Stream`] to the parent of its current [`Region`]
///
/// # Examples
/// ```
/// use timely::dataflow::scopes::Scope;
/// use timely::dataflow::operators::{enterleave::{EnterRegion, LeaveRegion}, ToStream};
///
/// timely::example(|outer| {
/// let stream = (0..9).to_stream(outer);
/// let output = outer.optional_region(false, |inner| {
/// stream.enter_region(inner).leave_region()
/// });
/// });
/// ```
fn leave_region(&self) -> Stream<G, D>;
}

impl<'a, G: Scope, D: Data> LeaveRegion<G, D> for Stream<Region<'a, G>, D> {
fn leave_region(&self) -> Stream<G, D> {
let scope = self.scope();

match scope.subgraph {
RegionSubgraph::Subgraph { subgraph, .. } => {
let output = subgraph.borrow_mut().new_output();
let (targets, registrar) = Tee::<G::Timestamp, D>::new();
let channel_id = scope.clone().new_identifier();

self.connect_to(
Target::new(0, output.port),
EgressNub {
targets,
phantom: PhantomData,
},
channel_id,
);

Stream::new(output, registrar, scope.parent)
}

RegionSubgraph::Passthrough => {
Stream::new(*self.name(), self.ports.clone(), scope.parent)
}
}
}
}

#[cfg(test)]
mod test {
/// Test that nested scopes with pass-through edges (no operators) correctly communicate progress.
Expand Down
50 changes: 50 additions & 0 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::cell::RefCell;

use crate::communication::{Data, Push, Pull};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::dataflow::scopes::region::RegionSubgraph;
use crate::scheduling::Scheduler;
use crate::scheduling::activate::Activations;
use crate::progress::{Timestamp, Operate, SubgraphBuilder};
Expand All @@ -15,6 +16,7 @@ use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;
use crate::worker::{AsWorker, Config};

use super::Region;
use super::{ScopeParent, Scope};

/// Type alias for iterative child scope.
Expand Down Expand Up @@ -134,6 +136,54 @@ where

result
}

fn optional_region_named<R, F>(&mut self, name: &str, enabled: bool, func: F) -> R
where
F: FnOnce(&mut Region<Self>) -> R,
{
// If the region is enabled then build the child dataflow graph, otherwise
// create a passthrough region
let region = if enabled {
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.subgraph.borrow().path.clone();

let subscope = RefCell::new(SubgraphBuilder::<T, T>::new_from(
index,
path,
self.logging(),
self.progress_logging.clone(),
name,
));

Some((subscope, index))
} else {
None
};

let result = {
let region = region
.as_ref()
.map_or(RegionSubgraph::Passthrough, |(scope, idx)| {
RegionSubgraph::subgraph(scope, *idx)
});

let mut builder = Region {
subgraph: region,
parent: self.clone(),
logging: self.logging.clone(),
progress_logging: self.progress_logging.clone(),
};
func(&mut builder)
};

// If the region is enabled then build and add the sub-scope to the dataflow graph
if let Some((subscope, index)) = region {
let subscope = subscope.into_inner().build(self);
self.add_operator_with_index(Box::new(subscope), index);
}

result
}
}

use crate::communication::Message;
Expand Down
72 changes: 72 additions & 0 deletions timely/src/dataflow/scopes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
//! Hierarchical organization of timely dataflow graphs.

use std::panic::Location;
use crate::progress::{Timestamp, Operate, Source, Target};
use crate::order::Product;
use crate::progress::timestamp::Refines;
use crate::communication::Allocate;
use crate::worker::AsWorker;

pub mod child;
pub mod region;

pub use self::child::Child;
pub use region::Region;

/// The information a child scope needs from its parent.
pub trait ScopeParent: AsWorker+Clone {
Expand Down Expand Up @@ -187,4 +190,73 @@ pub trait Scope: ScopeParent {
self.scoped::<<Self as ScopeParent>::Timestamp,R,F>(name, func)
}

/// Creates an optional dataflow region with the same timestamp as the outer scope.
///
/// If `enabled` is true then this acts like [`Scope::region()`] and if `enabled`
/// is false it's a no-op and produces no change in the dataflow graph.
///
/// # Examples
///
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Input, enterleave::{EnterRegion, LeaveRegion}};
///
/// timely::execute_from_args(std::env::args(), |worker| {
/// // must specify types as nothing else drives inference.
/// let input = worker.dataflow::<u64,_,_>(|scope| {
/// let (input, stream) = scope.new_input::<String>();
/// let output = scope.optional_region(true, |region| {
/// stream.enter_region(region).leave_region()
/// });
///
/// input
/// });
/// });
/// ```
///
#[track_caller]
fn optional_region<R, F>(&mut self, enabled: bool, func: F) -> R
where
F: FnOnce(&mut Region<Self>) -> R,
{
let caller = Location::caller();
let name = format!(
"OptionalRegion @ {}:{}:{}",
caller.file(),
caller.line(),
caller.column(),
);

self.optional_region_named(&name, enabled, func)
}

/// Creates an optional dataflow region with the same timestamp as the outer scope.
///
/// If `enabled` is true then this acts like [`Scope::region()`] and if `enabled`
/// is false it's a no-op and produces no change in the dataflow graph.
///
/// # Examples
///
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Input, enterleave::{EnterRegion, LeaveRegion}};
///
/// timely::execute_from_args(std::env::args(), |worker| {
/// // must specify types as nothing else drives inference.
/// let input = worker.dataflow::<u64,_,_>(|scope| {
/// let (input, stream) = scope.new_input::<String>();
/// let output = scope.optional_region_named(
/// "This is my region",
/// false,
/// |region| stream.enter_region(region).leave_region(),
/// );
///
/// input
/// });
/// });
/// ```
///
fn optional_region_named<R, F>(&mut self, name: &str, enabled: bool, func: F) -> R
where
F: FnOnce(&mut Region<Self>) -> R;
}
Loading