From 0abe5367f15c608601e0c3c4337dadb5d6086dd6 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 13 Feb 2024 12:32:08 -0800 Subject: [PATCH 1/3] add validate_record and submit_record methods to api client use the /metadata/json:validate endpoint --- nmdc_automation/api/nmdcapi.py | 44 ++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/nmdc_automation/api/nmdcapi.py b/nmdc_automation/api/nmdcapi.py index 15948b93..ee18b4a8 100755 --- a/nmdc_automation/api/nmdcapi.py +++ b/nmdc_automation/api/nmdcapi.py @@ -391,6 +391,50 @@ def run_query(self, query: dict): response = requests.post(url, json=query, headers=self.headers) return response.json() + def validate_record(self, record: dict): + """ + Function to validate a record using the Microbiome Data API. + + Parameters: + record (dict): The record to be validated. + + Returns: + dict: The response from the API. + """ + + self.ensure_token() + url = f"{self.base_url}metadata/json:validate" + + response = requests.post(url, json=record, headers=self.headers) + # check for validation failure + if response.status_code == 422: + logging.error(f"Validation failed for record: {record}") + logging.error(response.json()) + response.raise_for_status() + return response.json() + + def submit_record(self, record: dict): + """ + Function to submit a record using the Microbiome Data API. + + Parameters: + record (dict): The record to be submitted. + + Returns: + dict: The response from the API. + """ + + self.ensure_token() + url = f"{self.base_url}metadata/json:submit" + + response = requests.post(url, json=record, headers=self.headers) + # check if we already have it: + if response.status_code == 409: + logging.debug(f"Record already exists: {response.json()}") + return + response.raise_for_status() + return response.json() + def jprint(obj): print(json.dumps(obj, indent=2)) From 1c938e1a8be6a5ccfeb4228c2497b7108d42b914 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 13 Feb 2024 12:32:39 -0800 Subject: [PATCH 2/3] update ingest-records to use validate_record and submit_records endpoints --- nmdc_automation/re_iding/scripts/re_id_tool.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/nmdc_automation/re_iding/scripts/re_id_tool.py b/nmdc_automation/re_iding/scripts/re_id_tool.py index dc3090f4..f13b5f8f 100755 --- a/nmdc_automation/re_iding/scripts/re_id_tool.py +++ b/nmdc_automation/re_iding/scripts/re_id_tool.py @@ -394,7 +394,7 @@ def ingest_records(ctx, reid_records_file, changesheet_only): # Get API client(s) config = ctx.obj["site_config"] - api_client = NmdcRuntimeApi(config) + # api_client = NmdcRuntimeApi(config) api_user_client = NmdcRuntimeUserApi(config) with open(reid_records_file, "r") as f: @@ -438,14 +438,22 @@ def ingest_records(ctx, reid_records_file, changesheet_only): # submit the record to the workflows endpoint if not changesheet_only: - resp = api_client.post_objects(record) - logging.info(f"{record} posted, got response: {resp}") + # validate the record + if api_user_client.validate_record(record): + logging.info("Workflow Record validated") + # submit the record + resp = api_user_client.submit_record(record) + # No response is returned if the record already exists + if resp: + logging.info(f"Record submitted with response: {resp}") + else: + logging.info(f"Record already exists") else: logging.info(f"changesheet_only is True, skipping ingest") changesheet.write_changesheet() logging.info(f"changesheet written to {changesheet.output_filepath}") - if changesheet.validate_changesheet(api_client.config.napa_base_url): + if changesheet.validate_changesheet(api_user_client.base_url): logging.info(f"changesheet validated") else: logging.info(f"changesheet validation failed") From cc371aac1ca8f61f7887649654cb4bee0c5b38b9 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Thu, 15 Feb 2024 11:43:00 -0800 Subject: [PATCH 3/3] Update ingest-records command to insert re-IDed records via pymongo.insert_many Successfully tested on my local system - still need MongoDB uri/credentials for Napa DB instance --- .../re_iding/scripts/re_id_tool.py | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/nmdc_automation/re_iding/scripts/re_id_tool.py b/nmdc_automation/re_iding/scripts/re_id_tool.py index f13b5f8f..fc753dcb 100755 --- a/nmdc_automation/re_iding/scripts/re_id_tool.py +++ b/nmdc_automation/re_iding/scripts/re_id_tool.py @@ -11,6 +11,7 @@ import click import requests from linkml_runtime.dumpers import json_dumper +import pymongo from nmdc_automation.api import NmdcRuntimeApi, NmdcRuntimeUserApi from nmdc_automation.nmdc_common.client import NmdcApi @@ -380,12 +381,32 @@ def process_records(ctx, study_id, data_dir, update_links=False): is_flag=True, default=False, ) +@click.option("--mongo-uri",required=False, default="mongodb://localhost:27017",) +@click.option( + "--is-direct-connection", + type=bool, + required=False, + default=True, + show_default=True, + help=f"Whether you want the script to set the `directConnection` flag when connecting to the MongoDB server. " + f"That is required by some MongoDB servers that belong to a replica set. ", +) +@click.option( + "--database-name", + type=str, + required=False, + default="nmdc", + show_default=True, + help=f"MongoDB database name", +) @click.pass_context -def ingest_records(ctx, reid_records_file, changesheet_only): +def ingest_records(ctx, reid_records_file, changesheet_only, mongo_uri=None, + is_direct_connection=True, database_name="nmdc"): """ Read in json dump of re_id'd records and: - submit them to the - /v1/workflows/activities endpoint + - validate the records against the /metadata/json:validate endpoint + - insert the records into the MongoDB database defined by the --mongo-uri and --database-name options + """ start_time = time.time() logging.info(f"Submitting re id'd records from : {reid_records_file}") @@ -397,12 +418,23 @@ def ingest_records(ctx, reid_records_file, changesheet_only): # api_client = NmdcRuntimeApi(config) api_user_client = NmdcRuntimeUserApi(config) + # TODO - need to get mongo_uri credentials for the Napa DB instance in the config file. Meanwhile, we can test + # with a local MongoDB instance + logging.info(f"Using MongoDB URI: {mongo_uri}") + + # Connect to the MongoDB server and check the database name + client = pymongo.MongoClient(mongo_uri, directConnection=is_direct_connection) + with pymongo.timeout(5): + assert (database_name in client.list_database_names()), f"Database {database_name} not found" + logging.info(f"Connected to MongoDB server at {mongo_uri}") + db_client = client[database_name] + + with open(reid_records_file, "r") as f: db_records = json.load(f) changesheet = Changesheet(name=f"{reid_base_name}_changesheet") for record in db_records: - time.sleep(3) # remove the omics_processing_set and use it to generate # changes to omics_processing has_output omics_processing_set = record.pop("omics_processing_set") @@ -440,16 +472,20 @@ def ingest_records(ctx, reid_records_file, changesheet_only): if not changesheet_only: # validate the record if api_user_client.validate_record(record): - logging.info("Workflow Record validated") - # submit the record - resp = api_user_client.submit_record(record) - # No response is returned if the record already exists - if resp: - logging.info(f"Record submitted with response: {resp}") - else: - logging.info(f"Record already exists") + logging.info("DB Record validated") + # submit the record documents directly via the MongoDB client + for collection_name, collection in record.items(): + # collection shouldn't be empty but check just in case + if not collection: + logging.warning(f"Empty collection: {collection_name}") + continue + logging.info(f"Inserting {len(collection)} records into {collection_name}") + insertion_result = db_client[collection_name].insert_many(collection) + logging.info(f"Inserted {len(insertion_result.inserted_ids)} records into {collection_name}") + else: + logging.error("Workflow Record validation failed") else: - logging.info(f"changesheet_only is True, skipping ingest") + logging.info(f"changesheet_only is True, skipping Workflow and Data Object ingest") changesheet.write_changesheet() logging.info(f"changesheet written to {changesheet.output_filepath}")