From d6ae7bdc602d5240c59dcf940426579d5b8c579a Mon Sep 17 00:00:00 2001 From: aclum Date: Thu, 18 Jul 2024 16:33:51 -0700 Subject: [PATCH 01/11] update to MAG workflow config --- configs/workflows.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/configs/workflows.yaml b/configs/workflows.yaml index 616b306..1c6ae91 100644 --- a/configs/workflows.yaml +++ b/configs/workflows.yaml @@ -286,7 +286,7 @@ Workflows: Enabled: True Git_repo: https://github.com/microbiomedata/mg_annotation Git_repo: https://github.com/microbiomedata/metaMAGs - Version: v1.0.6 + Version: v1.3.3 WDL: mbin_nmdc.wdl Collection: mags_activity_set Predecessors: @@ -308,6 +308,7 @@ Workflows: sam_file: do:Assembly Coverage BAM smart_file: do:SMART Annotation GFF proteins_file: do:Annotation Amino Acid FASTA + map_file: do:Contig Mapping File gene_phylogeny_file: do:Gene Phylogeny tsv proj: "{activity_id}" Activity: From aa7f5fc6d6faafbce684bca02ec26318b1b3db25 Mon Sep 17 00:00:00 2001 From: aclum Date: Mon, 22 Jul 2024 11:06:16 -0700 Subject: [PATCH 02/11] Update workflows.yaml Bump to patch version for binning. --- configs/workflows.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/workflows.yaml b/configs/workflows.yaml index 1c6ae91..8cb3f4d 100644 --- a/configs/workflows.yaml +++ b/configs/workflows.yaml @@ -286,7 +286,7 @@ Workflows: Enabled: True Git_repo: https://github.com/microbiomedata/mg_annotation Git_repo: https://github.com/microbiomedata/metaMAGs - Version: v1.3.3 + Version: v1.3.4 WDL: mbin_nmdc.wdl Collection: mags_activity_set Predecessors: From a2fa8370e63bbd966bfa62289d1a026c7c08a848 Mon Sep 17 00:00:00 2001 From: aclum Date: Mon, 22 Jul 2024 16:23:19 -0700 Subject: [PATCH 03/11] Update workflows.yaml removing duplicate git url repo for mags. --- configs/workflows.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/configs/workflows.yaml b/configs/workflows.yaml index 8cb3f4d..dc54996 100644 --- a/configs/workflows.yaml +++ b/configs/workflows.yaml @@ -284,7 +284,6 @@ Workflows: - Name: MAGs Type: nmdc:MagsAnalysisActivity Enabled: True - Git_repo: https://github.com/microbiomedata/mg_annotation Git_repo: https://github.com/microbiomedata/metaMAGs Version: v1.3.4 WDL: mbin_nmdc.wdl From d9bf74ef6f17bcbe75999169f55408a5c7c9537f Mon Sep 17 00:00:00 2001 From: Shane Canon Date: Tue, 2 Jul 2024 09:48:18 -0700 Subject: [PATCH 04/11] Skip downstream jobs for disabled workflows If an activity has a workflow that is disabled (e.g. old version), then we shouldn't trigger any downstream activities from that. --- nmdc_automation/workflow_automation/sched.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nmdc_automation/workflow_automation/sched.py b/nmdc_automation/workflow_automation/sched.py index 4d4a293..f257c11 100644 --- a/nmdc_automation/workflow_automation/sched.py +++ b/nmdc_automation/workflow_automation/sched.py @@ -278,7 +278,11 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None) - if act.was_informed_by in skiplist: logging.debug(f"Skipping: {act.was_informed_by}") continue + if not act.workflow.enabled: + logging.debug(f"Skipping: {act.id}, workflow disabled.") + continue if allowlist and act.was_informed_by not in allowlist: + logging.debug(f"Skipping: {act.was_informed_by}, not in allow list.") continue jobs = self.find_new_jobs(act) for job in jobs: From 7bab653d2317a57e9f268451d1a192eeb374ec97 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 2 Jul 2024 16:21:35 -0700 Subject: [PATCH 05/11] Set Sequencing workflows Enabled: True Unit tests were failing after introducing a check in the scheduler to skip over workflows where Enabled was not True. The fixtures for the unit tests and the workflow files that the used did not have Enabled set at all for Sequencing Noninterleaved and Sequencing Interleaved --- configs/workflows.yaml | 2 ++ tests/test_sched.py | 1 - tests/workflows_test.yaml | 2 ++ tests/workflows_test2.yaml | 2 ++ 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/configs/workflows.yaml b/configs/workflows.yaml index 05a1e04..616b306 100644 --- a/configs/workflows.yaml +++ b/configs/workflows.yaml @@ -1,12 +1,14 @@ Workflows: - Name: Sequencing Noninterleaved Collection: omics_processing_set + Enabled: True Filter Output Objects: - Metagenome Raw Read 1 - Metagenome Raw Read 2 - Name: Sequencing Interleaved Collection: omics_processing_set + Enabled: True Filter Output Objects: - Metagenome Raw Reads diff --git a/tests/test_sched.py b/tests/test_sched.py index 931fb1c..4bf9a29 100644 --- a/tests/test_sched.py +++ b/tests/test_sched.py @@ -100,7 +100,6 @@ def test_submit(db, mock_api): assert len(resp) == 1 # The job should now be in a submitted state - # make this pass resp = jm.cycle() assert len(resp) == 0 diff --git a/tests/workflows_test.yaml b/tests/workflows_test.yaml index 4a379ce..715474e 100644 --- a/tests/workflows_test.yaml +++ b/tests/workflows_test.yaml @@ -1,12 +1,14 @@ Workflows: - Name: Sequencing Noninterleaved Collection: omics_processing_set + Enabled: True Filter Output Objects: - Metagenome Raw Read 1 - Metagenome Raw Read 2 - Name: Sequencing Interleaved Collection: omics_processing_set + Enabled: True Filter Output Objects: - Metagenome Raw Reads diff --git a/tests/workflows_test2.yaml b/tests/workflows_test2.yaml index 5dbf542..f159203 100644 --- a/tests/workflows_test2.yaml +++ b/tests/workflows_test2.yaml @@ -1,12 +1,14 @@ Workflows: - Name: Sequencing Noninterleaved Collection: omics_processing_set + Enabled: True Filter Output Objects: - Metagenome Raw Read 1 - Metagenome Raw Read 2 - Name: Sequencing Interleaved Collection: omics_processing_set + Enabled: True Filter Output Objects: - Metagenome Raw Reads From ffd604648eb11d00d13eb02d2acb6158b46144d6 Mon Sep 17 00:00:00 2001 From: Shane Canon Date: Sat, 13 Jul 2024 10:54:17 -0700 Subject: [PATCH 06/11] Fix logic for in range activities This addresses issue #211. The fix is to load all activities with the same git_url and then check if the activity version is within range of version of the workflow. Tests were added to check the behavior. Some other minor cleanup is included such as suppressing warnings for the same thing repeatedly. --- .../workflow_automation/activities.py | 43 +++++++++++++++++-- nmdc_automation/workflow_automation/sched.py | 5 ++- tests/test_sched.py | 37 ++++++++++++++-- 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/nmdc_automation/workflow_automation/activities.py b/nmdc_automation/workflow_automation/activities.py index 788cb46..fb87364 100644 --- a/nmdc_automation/workflow_automation/activities.py +++ b/nmdc_automation/workflow_automation/activities.py @@ -1,12 +1,19 @@ import logging from typing import List from .workflows import Workflow +from semver.version import Version + + +warned_objects = set() def _load_data_objects(db, workflows: List[Workflow]): """ Read all of the data objects and generate a map by ID + + TODO: In the future this will probably need to be redone + since the number of data objects could get very large. """ # Build up a filter of what types are used @@ -23,7 +30,28 @@ def _load_data_objects(db, workflows: List[Workflow]): return data_objs_by_id +def _within_range(ver1: str, ver2: str) -> bool: + """ + Determine if two workflows are within a major and minor + version of each other. + """ + + def get_version(version): + v_string = version.lstrip("b").lstrip("v") + return Version.parse(v_string) + + v1 = get_version(ver1) + v2 = get_version(ver2) + if v1.major == v2.major and v1.minor == v2.minor: + return True + return False + + def _check(match_types, data_object_ids, data_objs): + """ + This iterates through a list of data objects and + checks the type against the match types. + """ if not data_object_ids: return False if not match_types or len(match_types) == 0: @@ -37,6 +65,10 @@ def _check(match_types, data_object_ids, data_objs): def _filter_skip(wf, rec, data_objs): + """ + Some workflows require specific inputs or outputs. This + implements the filtering for those. + """ match_in = _check(wf.filter_input_objects, rec.get("has_input"), data_objs) @@ -56,8 +88,9 @@ def _read_acitivites(db, workflows: List[Workflow], logging.debug(f"Checking {wf.name}:{wf.version}") q = filter q["git_url"] = wf.git_repo - q["version"] = wf.version for rec in db[wf.collection].find(q): + if wf.version and not _within_range(rec["version"], wf.version): + continue if wf.collection == "omics_processing_set" and \ rec["id"].startswith("gold"): continue @@ -87,7 +120,9 @@ def _resolve_relationships(activities, data_obj_act): for do_id in act.has_input: if do_id not in data_obj_act: # This really shouldn't happen - logging.warning(f"Missing data object {do_id}") + if do_id not in warned_objects: + logging.warning(f"Missing data object {do_id}") + warned_objects.add(do_id) continue parent_act = data_obj_act[do_id] # This is to cover the case where it was a duplicate. @@ -133,7 +168,9 @@ def _find_data_object_activities(activities, data_objs_by_id): # Once we re-id the data objects this # shouldn't happen if do_id in data_obj_act: - logging.warning(f"Duplicate output object {do_id}") + if do_id not in warned_objects: + logging.warning(f"Duplicate output object {do_id}") + warned_objects.add(do_id) data_obj_act[do_id] = None else: data_obj_act[do_id] = act diff --git a/nmdc_automation/workflow_automation/sched.py b/nmdc_automation/workflow_automation/sched.py index f257c11..af2d478 100644 --- a/nmdc_automation/workflow_automation/sched.py +++ b/nmdc_automation/workflow_automation/sched.py @@ -48,7 +48,7 @@ def get_version(wf): v1 = get_version(wf1) v2 = get_version(wf2) if force: - return v1==v2 + return v1 == v2 if v1.major == v2.major and v1.minor == v2.minor: return True return False @@ -267,7 +267,8 @@ def find_new_jobs(self, act: Activity) -> list[Job]: return new_jobs - def cycle(self, dryrun: bool = False, skiplist: set = set(), allowlist = None) -> list: + def cycle(self, dryrun: bool = False, skiplist: set = set(), + allowlist=None) -> list: """ This function does a single cycle of looking for new jobs """ diff --git a/tests/test_sched.py b/tests/test_sched.py index 4bf9a29..b09b233 100644 --- a/tests/test_sched.py +++ b/tests/test_sched.py @@ -73,13 +73,16 @@ def init_test(db): load(db, fn, reset=True) -def mock_progress(db, wf): +def mock_progress(db, wf, version=None, flush=True): s = wf.collection data = read_json("%s.json" % (s))[0] if 'version' not in data: data['git_url'] = wf.git_repo data['version'] = wf.version - db[s].delete_many({}) + if version: + data['version'] = version + if flush: + db[s].delete_many({}) db[s].insert_one(data) @@ -128,7 +131,10 @@ def test_progress(db, mock_api): assert len(resp) == 0 wf = workflow_by_name['Metagenome Assembly'] - mock_progress(db, wf) + # Lets override the version to simulate an older run + # for this workflow that is stil within range of the + # current workflow + mock_progress(db, wf, version="v1.0.2") resp = jm.cycle() assert "assembly_id" in resp[0]["config"]["inputs"] assert len(resp) == 1 @@ -191,3 +197,28 @@ def test_multiple_versions(db, mock_api): db.jobs.delete_many({}) resp = jm.cycle() assert len(resp) == 4 + + +def test_out_of_range(db, mock_api): + init_test(db) + reset_db(db) + db.jobs.delete_many({}) + load(db, "data_object_set.json") + load(db, "omics_processing_set.json") + jm = Scheduler(db, wfn="./tests/workflows_test.yaml", + site_conf="./tests/site_configuration_test.toml") + workflow_by_name = dict() + for wf in jm.workflows: + workflow_by_name[wf.name] = wf + + # Let's create two RQC records. One will be in range + # and the other will not. We should only get new jobs + # for the one in range. + wf = workflow_by_name['Reads QC'] + mock_progress(db, wf) + mock_progress(db, wf, version="v0.0.1", flush=False) + + resp = jm.cycle() + assert len(resp) == 2 + resp = jm.cycle() + assert len(resp) == 0 From b1a2c776ac718a396fc61ab9f38ec00b93a0e75d Mon Sep 17 00:00:00 2001 From: Shane Canon Date: Sat, 13 Jul 2024 11:55:44 -0700 Subject: [PATCH 07/11] Minor fixes to scheduler and activities Some minor enhancements to the scheduler: * Do only one cycle in dryrun mode * Some improved logging * Filter the allow list in the mongo query instead of afterwards * Strip -beta off version before comparing (this is for some legacy records) --- nmdc_automation/workflow_automation/activities.py | 7 +++++-- nmdc_automation/workflow_automation/sched.py | 12 +++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/nmdc_automation/workflow_automation/activities.py b/nmdc_automation/workflow_automation/activities.py index fb87364..0e517af 100644 --- a/nmdc_automation/workflow_automation/activities.py +++ b/nmdc_automation/workflow_automation/activities.py @@ -37,7 +37,7 @@ def _within_range(ver1: str, ver2: str) -> bool: """ def get_version(version): - v_string = version.lstrip("b").lstrip("v") + v_string = version.lstrip("b").lstrip("v").rstrip("-beta") return Version.parse(v_string) v1 = get_version(ver1) @@ -90,6 +90,7 @@ def _read_acitivites(db, workflows: List[Workflow], q["git_url"] = wf.git_repo for rec in db[wf.collection].find(q): if wf.version and not _within_range(rec["version"], wf.version): + logging.debug(f"Skipping {wf.name} {wf.version} {rec['version']}") continue if wf.collection == "omics_processing_set" and \ rec["id"].startswith("gold"): @@ -134,7 +135,9 @@ def _resolve_relationships(activities, data_obj_act): # This is just a safeguard if act.was_informed_by != parent_act.was_informed_by: logging.warning("Mismatched informed by for " - f"{do_id} in {act.id}") + f"{do_id} in {act.id} " + f"{act.was_informed_by} != " + f"{parent_act.was_informed_by}") continue # We only want to use it as a parent if it is the right # parent workflow. Some inputs may come from ancestors diff --git a/nmdc_automation/workflow_automation/sched.py b/nmdc_automation/workflow_automation/sched.py index af2d478..48705e6 100644 --- a/nmdc_automation/workflow_automation/sched.py +++ b/nmdc_automation/workflow_automation/sched.py @@ -272,7 +272,10 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(), """ This function does a single cycle of looking for new jobs """ - acts = load_activities(self.db, self.workflows) + filt = None + if allowlist: + filt = {"was_informed_by": {"$in": list(allowlist)}} + acts = load_activities(self.db, self.workflows, filter=filt) self.get_existing_jobs.cache_clear() job_recs = [] for act in acts: @@ -282,14 +285,11 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(), if not act.workflow.enabled: logging.debug(f"Skipping: {act.id}, workflow disabled.") continue - if allowlist and act.was_informed_by not in allowlist: - logging.debug(f"Skipping: {act.was_informed_by}, not in allow list.") - continue jobs = self.find_new_jobs(act) for job in jobs: if dryrun: msg = f"new job: informed_by: {job.informed_by} trigger: {job.trigger_id} " - msg += f"wf: {job.workflow.name}" + msg += f"wf: {job.workflow.name} ver: {job.workflow.version}" logging.info(msg) continue try: @@ -324,6 +324,8 @@ def main(): # pragma: no cover allowlist.add(line.rstrip()) while True: sched.cycle(dryrun=dryrun, skiplist=skiplist, allowlist=allowlist) + if dryrun: + break _sleep(_POLL_INTERVAL) From bf89a7787380c315733512679a770d8be1076255 Mon Sep 17 00:00:00 2001 From: Shane Canon Date: Sun, 14 Jul 2024 10:01:35 -0700 Subject: [PATCH 08/11] Fix initialization --- nmdc_automation/workflow_automation/sched.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nmdc_automation/workflow_automation/sched.py b/nmdc_automation/workflow_automation/sched.py index 48705e6..01bc00f 100644 --- a/nmdc_automation/workflow_automation/sched.py +++ b/nmdc_automation/workflow_automation/sched.py @@ -272,7 +272,7 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(), """ This function does a single cycle of looking for new jobs """ - filt = None + filt = {} if allowlist: filt = {"was_informed_by": {"$in": list(allowlist)}} acts = load_activities(self.db, self.workflows, filter=filt) From 85fc4794da0cfe066993ba0915f18657c9dffdff Mon Sep 17 00:00:00 2001 From: aclum Date: Thu, 18 Jul 2024 16:33:51 -0700 Subject: [PATCH 09/11] update to MAG workflow config --- configs/workflows.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/configs/workflows.yaml b/configs/workflows.yaml index 616b306..1c6ae91 100644 --- a/configs/workflows.yaml +++ b/configs/workflows.yaml @@ -286,7 +286,7 @@ Workflows: Enabled: True Git_repo: https://github.com/microbiomedata/mg_annotation Git_repo: https://github.com/microbiomedata/metaMAGs - Version: v1.0.6 + Version: v1.3.3 WDL: mbin_nmdc.wdl Collection: mags_activity_set Predecessors: @@ -308,6 +308,7 @@ Workflows: sam_file: do:Assembly Coverage BAM smart_file: do:SMART Annotation GFF proteins_file: do:Annotation Amino Acid FASTA + map_file: do:Contig Mapping File gene_phylogeny_file: do:Gene Phylogeny tsv proj: "{activity_id}" Activity: From e1b908d5415cb37d3fe74037e132492b943f7b50 Mon Sep 17 00:00:00 2001 From: aclum Date: Mon, 22 Jul 2024 11:06:16 -0700 Subject: [PATCH 10/11] Update workflows.yaml Bump to patch version for binning. --- configs/workflows.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/workflows.yaml b/configs/workflows.yaml index 1c6ae91..8cb3f4d 100644 --- a/configs/workflows.yaml +++ b/configs/workflows.yaml @@ -286,7 +286,7 @@ Workflows: Enabled: True Git_repo: https://github.com/microbiomedata/mg_annotation Git_repo: https://github.com/microbiomedata/metaMAGs - Version: v1.3.3 + Version: v1.3.4 WDL: mbin_nmdc.wdl Collection: mags_activity_set Predecessors: From e1c4c89923ee9574cdb629d9f1d9e8bcfdea9bff Mon Sep 17 00:00:00 2001 From: aclum Date: Mon, 22 Jul 2024 16:23:19 -0700 Subject: [PATCH 11/11] Update workflows.yaml removing duplicate git url repo for mags. --- configs/workflows.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/configs/workflows.yaml b/configs/workflows.yaml index 8cb3f4d..dc54996 100644 --- a/configs/workflows.yaml +++ b/configs/workflows.yaml @@ -284,7 +284,6 @@ Workflows: - Name: MAGs Type: nmdc:MagsAnalysisActivity Enabled: True - Git_repo: https://github.com/microbiomedata/mg_annotation Git_repo: https://github.com/microbiomedata/metaMAGs Version: v1.3.4 WDL: mbin_nmdc.wdl