From 9e95508de5c475a63620a6dfbf836d12221b0d83 Mon Sep 17 00:00:00 2001 From: Iaroslav Ciupin Date: Wed, 25 Oct 2023 23:26:19 +0300 Subject: [PATCH] Allow passing cluster pool to run tests (#90) Signed-off-by: Iaroslav Ciupin --- boilerplate/flyte/end2end/run-tests.py | 40 +++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py index 31f66d9..4b845b0 100644 --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -5,7 +5,7 @@ import sys import time import traceback -from typing import Dict, List, Mapping, Tuple +from typing import Dict, List, Mapping, Tuple, Optional import click import requests @@ -89,9 +89,15 @@ } -def execute_workflow(remote, version, workflow_name, inputs): +def execute_workflow( + remote: FlyteRemote, + version, + workflow_name, + inputs, + cluster_pool_name: Optional[str] = None, +): print(f"Fetching workflow={workflow_name} and version={version}") - wf = remote.fetch_workflow(name=workflow_name, version=version) + wf = remote.fetch_workflow(name=workflow_name, version=version, cluster_pool=cluster_pool_name) return remote.execute(wf, inputs=inputs, wait=False) @@ -125,6 +131,7 @@ def schedule_workflow_groups( workflow_groups: List[str], remote: FlyteRemote, terminate_workflow_on_failure: bool, + cluster_pool_name: Optional[str] = None, ) -> Dict[str, bool]: """ Schedule workflows executions for all workflow groups and return True if all executions succeed, otherwise @@ -135,7 +142,8 @@ def schedule_workflow_groups( for wf_group in workflow_groups: workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, []) executions_by_wfgroup[wf_group] = [ - execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows + execute_workflow(remote, tag, workflow[0], workflow[1], cluster_pool_name) + for workflow in workflows ] # Wait for all executions to finish @@ -181,6 +189,7 @@ def run( terminate_workflow_on_failure: bool, test_project_name: str, test_project_domain: str, + cluster_pool_name: Optional[str] = None, ) -> List[Dict[str, str]]: remote = FlyteRemote( Config.auto(config_file=config_file_path), @@ -217,7 +226,11 @@ def run( valid_workgroups.append(workflow_group) results_by_wfgroup = schedule_workflow_groups( - flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure + flytesnacks_release_tag, + valid_workgroups, + remote, + terminate_workflow_on_failure, + cluster_pool_name, ) for workflow_group, succeeded in results_by_wfgroup.items(): @@ -277,6 +290,12 @@ def run( @click.argument("flytesnacks_release_tag") @click.argument("priorities") @click.argument("config_file") +@click.argument( + "cluster_pool_name", + required=False, + type=str, + default=None, +) def cli( flytesnacks_release_tag, priorities, @@ -285,10 +304,17 @@ def cli( terminate_workflow_on_failure, test_project_name, test_project_domain, + cluster_pool_name, ): print(f"return_non_zero_on_failure={return_non_zero_on_failure}") - results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure, test_project_name, - test_project_domain) + results = run( + flytesnacks_release_tag, + priorities, config_file, + terminate_workflow_on_failure, + test_project_name, + test_project_domain, + cluster_pool_name, + ) # Write a json object in its own line describing the result of this run to stdout print(f"Result of run:\n{json.dumps(results)}")