Skip to content

Commit

Permalink
[DF] Distributed RunGraphs always submits concurrently
Browse files Browse the repository at this point in the history
Making the decision on the scheduling mode based on `IsImpliciMTEnabled` would
mean forcing users to run `EnableImplicitMT` every time they want to submit
distributed graphs concurrently.

If the user has already chosen to run a distributed RDataFrame application,
RunGraphs should just submit all the computations concurrently to the
distributed scheduler. The "sequential" mode of operation can still be achieved
by e.g. calling `GetValue` on distributed proxies in a for loop.
  • Loading branch information
vepadulano committed Jul 29, 2023
1 parent 2923145 commit 8d03a46
Showing 1 changed file with 5 additions and 12 deletions.
17 changes: 5 additions & 12 deletions bindings/experimental/distrdf/python/DistRDF/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,18 @@ def RunGraphs(proxies: Iterable) -> int:
"""
# Import here to avoid circular dependencies in main module
from DistRDF.Proxy import execute_graph
import ROOT

if not proxies:
logger.warning("RunGraphs: Got an empty list of handles, now quitting.")
return 0

# Get proxies belonging to distinct computation graphs
uniqueproxies = list({proxy.proxied_node.get_head(): proxy for proxy in proxies}.values())

if ROOT.IsImplicitMTEnabled():
# Submit all computation graphs concurrently from multiple Python threads.
# The submission is not computationally intensive
with concurrent.futures.ThreadPoolExecutor(max_workers=len(uniqueproxies)) as executor:
futures = [executor.submit(execute_graph, proxy.proxied_node) for proxy in uniqueproxies]
concurrent.futures.wait(futures)
else:
# Run the graphs sequentially
for p in uniqueproxies:
execute_graph(p.proxied_node)
# Submit all computation graphs concurrently from multiple Python threads.
# The submission is not computationally intensive
with concurrent.futures.ThreadPoolExecutor(max_workers=len(uniqueproxies)) as executor:
futures = [executor.submit(execute_graph, proxy.proxied_node) for proxy in uniqueproxies]
concurrent.futures.wait(futures)

return len(uniqueproxies)

Expand Down

0 comments on commit 8d03a46

Please sign in to comment.