Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Commit

Permalink
Allow passing cluster pool to run tests (#90)
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin authored Oct 25, 2023
1 parent ce5adfb commit 9e95508
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions boilerplate/flyte/end2end/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import time
import traceback
from typing import Dict, List, Mapping, Tuple
from typing import Dict, List, Mapping, Tuple, Optional

import click
import requests
Expand Down Expand Up @@ -89,9 +89,15 @@
}


def execute_workflow(remote, version, workflow_name, inputs):
def execute_workflow(
remote: FlyteRemote,
version,
workflow_name,
inputs,
cluster_pool_name: Optional[str] = None,
):
print(f"Fetching workflow={workflow_name} and version={version}")
wf = remote.fetch_workflow(name=workflow_name, version=version)
wf = remote.fetch_workflow(name=workflow_name, version=version, cluster_pool=cluster_pool_name)
return remote.execute(wf, inputs=inputs, wait=False)


Expand Down Expand Up @@ -125,6 +131,7 @@ def schedule_workflow_groups(
workflow_groups: List[str],
remote: FlyteRemote,
terminate_workflow_on_failure: bool,
cluster_pool_name: Optional[str] = None,
) -> Dict[str, bool]:
"""
Schedule workflows executions for all workflow groups and return True if all executions succeed, otherwise
Expand All @@ -135,7 +142,8 @@ def schedule_workflow_groups(
for wf_group in workflow_groups:
workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, [])
executions_by_wfgroup[wf_group] = [
execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows
execute_workflow(remote, tag, workflow[0], workflow[1], cluster_pool_name)
for workflow in workflows
]

# Wait for all executions to finish
Expand Down Expand Up @@ -181,6 +189,7 @@ def run(
terminate_workflow_on_failure: bool,
test_project_name: str,
test_project_domain: str,
cluster_pool_name: Optional[str] = None,
) -> List[Dict[str, str]]:
remote = FlyteRemote(
Config.auto(config_file=config_file_path),
Expand Down Expand Up @@ -217,7 +226,11 @@ def run(
valid_workgroups.append(workflow_group)

results_by_wfgroup = schedule_workflow_groups(
flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure
flytesnacks_release_tag,
valid_workgroups,
remote,
terminate_workflow_on_failure,
cluster_pool_name,
)

for workflow_group, succeeded in results_by_wfgroup.items():
Expand Down Expand Up @@ -277,6 +290,12 @@ def run(
@click.argument("flytesnacks_release_tag")
@click.argument("priorities")
@click.argument("config_file")
@click.argument(
"cluster_pool_name",
required=False,
type=str,
default=None,
)
def cli(
flytesnacks_release_tag,
priorities,
Expand All @@ -285,10 +304,17 @@ def cli(
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
):
print(f"return_non_zero_on_failure={return_non_zero_on_failure}")
results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure, test_project_name,
test_project_domain)
results = run(
flytesnacks_release_tag,
priorities, config_file,
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
)

# Write a json object in its own line describing the result of this run to stdout
print(f"Result of run:\n{json.dumps(results)}")
Expand Down

0 comments on commit 9e95508

Please sign in to comment.