diff --git a/nmdc_automation/workflow_automation/sched.py b/nmdc_automation/workflow_automation/sched.py index b96f98bf..3acbd406 100644 --- a/nmdc_automation/workflow_automation/sched.py +++ b/nmdc_automation/workflow_automation/sched.py @@ -32,25 +32,32 @@ def get_mongo_db() -> MongoDatabase: return _client[os.getenv("MONGO_DBNAME")] -def within_range(wf1: Workflow, wf2: Workflow, force=False) -> bool: +def within_range(wf1: Workflow, wf2: Workflow, version_range: str, force: bool = False) -> bool: """ - Determine if two workflows are within a major and minor - version of each other. + Determine if two workflows are within a specified version component of each other. + The version_range parameter can be 'major', 'minor', or 'patch'. """ def get_version(wf): - v_string = wf1.version.lstrip("b").lstrip("v") + v_string = wf.version.lstrip("b").lstrip("v") return Version.parse(v_string) - # Apples and oranges if wf1.name != wf2.name: return False + v1 = get_version(wf1) v2 = get_version(wf2) + if force: - return v1==v2 - if v1.major == v2.major and v1.minor == v2.minor: - return True + return v1 == v2 + + if version_range == "major": + return v1.major == v2.major + elif version_range == "minor": + return v1.major == v2.major and v1.minor == v2.minor + elif version_range == "patch": + return v1 == v2 + return False @@ -233,7 +240,7 @@ def get_existing_jobs(self, wf: Workflow): existing_jobs.add(act) return existing_jobs - def find_new_jobs(self, act: Activity) -> list[Job]: + def find_new_jobs(self, act: Activity, compare_version: str) -> list[Job]: """ For a given activity see if there are any new jobs that should be created. @@ -251,7 +258,7 @@ def find_new_jobs(self, act: Activity) -> list[Job]: # Look at previously generated derived # activities to see if this is already done. for child_act in act.children: - if within_range(child_act.workflow, wf, force=self.force): + if within_range(child_act.workflow, wf, compare_version, force=self.force): break else: # These means no existing activities were @@ -262,7 +269,7 @@ def find_new_jobs(self, act: Activity) -> list[Job]: return new_jobs - def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None) -> list: + def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None, compare_version: str = "major") -> list: """ This function does a single cycle of looking for new jobs """ @@ -275,7 +282,7 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None) - continue if allowlist and act.was_informed_by not in allowlist: continue - jobs = self.find_new_jobs(act) + jobs = self.find_new_jobs(act, compare_version) for job in jobs: if dryrun: msg = f"new job: informed_by: {job.informed_by} trigger: {job.trigger_id} " @@ -312,8 +319,12 @@ def main(): # pragma: no cover with open(os.environ.get("ALLOWLISTFILE")) as f: for line in f: allowlist.add(line.rstrip()) + if os.environ.get("COMPAREVERSION"): + compare_version = os.environ.get("COMPAREVERSION") + else: + compare_version = "major" while True: - sched.cycle(dryrun=dryrun, skiplist=skiplist, allowlist=allowlist) + sched.cycle(dryrun=dryrun, skiplist=skiplist, allowlist=allowlist, compare_version=compare_version) _sleep(_POLL_INTERVAL) diff --git a/tests/test_sched.py b/tests/test_sched.py index 45f4a803..a74f5c98 100644 --- a/tests/test_sched.py +++ b/tests/test_sched.py @@ -1,7 +1,8 @@ from pymongo import MongoClient import json import os -from nmdc_automation.workflow_automation.sched import Scheduler +from nmdc_automation.workflow_automation.sched import Scheduler, within_range +from nmdc_automation.workflow_automation.workflows import load_workflows, Workflow from pytest import fixture from pathlib import Path from time import time @@ -82,6 +83,57 @@ def mock_progress(db, wf): db[s].delete_many({}) db[s].insert_one(data) +def test_within_range(): + + wf1_major_dict = { + "Name": "TestWF", + "Version": "v1.1.0", + } + wf2_major_dict = { + "Name": "TestWF", + "Version": "v2.0.0", + } + + wf1_minor_dict = { + "Name": "TestWF", + "Version": "v1.1.0", + } + wf2_minor_dict = { + "Name": "TestWF", + "Version": "v1.2.5", + } + + wf1_patch_dict = { + "Name": "TestWF", + "Version": "v1.1.0", + } + wf2_patch_dict = { + "Name": "TestWF", + "Version": "v1.1.5", + } + + # Instantiate Workflow objects from dictionaries + wf1_major = Workflow(wf1_major_dict) + wf2_major = Workflow(wf2_major_dict) + wf1_minor = Workflow(wf1_minor_dict) + wf2_minor = Workflow(wf2_minor_dict) + wf1_patch = Workflow(wf1_patch_dict) + wf2_patch = Workflow(wf2_patch_dict) + + # Test major version range + assert within_range(wf1_major, wf2_major, "major") == False + assert within_range(wf1_major, wf1_minor, "major") == True + + # Test minor version range + assert within_range(wf1_minor, wf2_minor, "minor") == False + assert within_range(wf1_minor, wf1_major, "minor") == True + + # Test patch version range + assert within_range(wf1_patch, wf2_patch, "patch") == False + assert within_range(wf1_patch, wf1_minor, "patch") == True + + assert within_range(wf1_patch, wf1_patch, "mafor", force=True) == True + def test_submit(db, mock_api): """