Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compare vversion in the schduler #63

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions nmdc_automation/workflow_automation/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,32 @@ def get_mongo_db() -> MongoDatabase:
return _client[os.getenv("MONGO_DBNAME")]


def within_range(wf1: Workflow, wf2: Workflow, force=False) -> bool:
def within_range(wf1: Workflow, wf2: Workflow, version_range: str, force: bool = False) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a test and raise a ValueError if version_range isn't in the allowed options.

"""
Determine if two workflows are within a major and minor
version of each other.
Determine if two workflows are within a specified version component of each other.
The version_range parameter can be 'major', 'minor', or 'patch'.
"""

def get_version(wf):
v_string = wf1.version.lstrip("b").lstrip("v")
v_string = wf.version.lstrip("b").lstrip("v")
return Version.parse(v_string)

# Apples and oranges
if wf1.name != wf2.name:
return False

v1 = get_version(wf1)
v2 = get_version(wf2)

if force:
return v1==v2
if v1.major == v2.major and v1.minor == v2.minor:
return True
return v1 == v2

if version_range == "major":
return v1.major == v2.major
elif version_range == "minor":
return v1.major == v2.major and v1.minor == v2.minor
elif version_range == "patch":
return v1 == v2

return False


Expand Down Expand Up @@ -233,7 +240,7 @@ def get_existing_jobs(self, wf: Workflow):
existing_jobs.add(act)
return existing_jobs

def find_new_jobs(self, act: Activity) -> list[Job]:
def find_new_jobs(self, act: Activity, compare_version: str) -> list[Job]:
"""
For a given activity see if there are any new jobs
that should be created.
Expand All @@ -251,7 +258,7 @@ def find_new_jobs(self, act: Activity) -> list[Job]:
# Look at previously generated derived
# activities to see if this is already done.
for child_act in act.children:
if within_range(child_act.workflow, wf, force=self.force):
if within_range(child_act.workflow, wf, compare_version, force=self.force):
break
else:
# These means no existing activities were
Expand All @@ -262,7 +269,7 @@ def find_new_jobs(self, act: Activity) -> list[Job]:

return new_jobs

def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None) -> list:
def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None, compare_version: str = "major") -> list:
"""
This function does a single cycle of looking for new jobs
"""
Expand All @@ -275,7 +282,7 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None) -
continue
if allowlist and act.was_informed_by not in allowlist:
continue
jobs = self.find_new_jobs(act)
jobs = self.find_new_jobs(act, compare_version)
for job in jobs:
if dryrun:
msg = f"new job: informed_by: {job.informed_by} trigger: {job.trigger_id} "
Expand Down Expand Up @@ -312,8 +319,12 @@ def main(): # pragma: no cover
with open(os.environ.get("ALLOWLISTFILE")) as f:
for line in f:
allowlist.add(line.rstrip())
if os.environ.get("COMPAREVERSION"):
compare_version = os.environ.get("COMPAREVERSION")
else:
compare_version = "major"
while True:
sched.cycle(dryrun=dryrun, skiplist=skiplist, allowlist=allowlist)
sched.cycle(dryrun=dryrun, skiplist=skiplist, allowlist=allowlist, compare_version=compare_version)
_sleep(_POLL_INTERVAL)


Expand Down
54 changes: 53 additions & 1 deletion tests/test_sched.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pymongo import MongoClient
import json
import os
from nmdc_automation.workflow_automation.sched import Scheduler
from nmdc_automation.workflow_automation.sched import Scheduler, within_range
from nmdc_automation.workflow_automation.workflows import load_workflows, Workflow
from pytest import fixture
from pathlib import Path
from time import time
Expand Down Expand Up @@ -82,6 +83,57 @@ def mock_progress(db, wf):
db[s].delete_many({})
db[s].insert_one(data)

def test_within_range():

wf1_major_dict = {
"Name": "TestWF",
"Version": "v1.1.0",
}
wf2_major_dict = {
"Name": "TestWF",
"Version": "v2.0.0",
}

wf1_minor_dict = {
"Name": "TestWF",
"Version": "v1.1.0",
}
wf2_minor_dict = {
"Name": "TestWF",
"Version": "v1.2.5",
}

wf1_patch_dict = {
"Name": "TestWF",
"Version": "v1.1.0",
}
wf2_patch_dict = {
"Name": "TestWF",
"Version": "v1.1.5",
}

# Instantiate Workflow objects from dictionaries
wf1_major = Workflow(wf1_major_dict)
wf2_major = Workflow(wf2_major_dict)
wf1_minor = Workflow(wf1_minor_dict)
wf2_minor = Workflow(wf2_minor_dict)
wf1_patch = Workflow(wf1_patch_dict)
wf2_patch = Workflow(wf2_patch_dict)

# Test major version range
assert within_range(wf1_major, wf2_major, "major") == False
assert within_range(wf1_major, wf1_minor, "major") == True

# Test minor version range
assert within_range(wf1_minor, wf2_minor, "minor") == False
assert within_range(wf1_minor, wf1_major, "minor") == True

# Test patch version range
assert within_range(wf1_patch, wf2_patch, "patch") == False
assert within_range(wf1_patch, wf1_minor, "patch") == True

assert within_range(wf1_patch, wf1_patch, "mafor", force=True) == True


def test_submit(db, mock_api):
"""
Expand Down
Loading