Skip to content

Commit

Permalink
Merge pull request #59 from microbiomedata/58-refactor-re_id_tools-in…
Browse files Browse the repository at this point in the history
…gest-records-to-not-use-deprecated-v1workflowsactivities-endpoint

58 refactor re id tools ingest records to not use deprecated v1workflowsactivities endpoint
  • Loading branch information
mbthornton-lbl authored Feb 15, 2024
2 parents 52cf2e9 + cc371aa commit 5758c6c
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 9 deletions.
44 changes: 44 additions & 0 deletions nmdc_automation/api/nmdcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,50 @@ def run_query(self, query: dict):
response = requests.post(url, json=query, headers=self.headers)
return response.json()

def validate_record(self, record: dict):
"""
Function to validate a record using the Microbiome Data API.
Parameters:
record (dict): The record to be validated.
Returns:
dict: The response from the API.
"""

self.ensure_token()
url = f"{self.base_url}metadata/json:validate"

response = requests.post(url, json=record, headers=self.headers)
# check for validation failure
if response.status_code == 422:
logging.error(f"Validation failed for record: {record}")
logging.error(response.json())
response.raise_for_status()
return response.json()

def submit_record(self, record: dict):
"""
Function to submit a record using the Microbiome Data API.
Parameters:
record (dict): The record to be submitted.
Returns:
dict: The response from the API.
"""

self.ensure_token()
url = f"{self.base_url}metadata/json:submit"

response = requests.post(url, json=record, headers=self.headers)
# check if we already have it:
if response.status_code == 409:
logging.debug(f"Record already exists: {response.json()}")
return
response.raise_for_status()
return response.json()

def jprint(obj):
print(json.dumps(obj, indent=2))

Expand Down
62 changes: 53 additions & 9 deletions nmdc_automation/re_iding/scripts/re_id_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import click
import requests
from linkml_runtime.dumpers import json_dumper
import pymongo

from nmdc_automation.api import NmdcRuntimeApi, NmdcRuntimeUserApi
from nmdc_automation.nmdc_common.client import NmdcApi
Expand Down Expand Up @@ -380,12 +381,32 @@ def process_records(ctx, study_id, data_dir, update_links=False):
is_flag=True,
default=False,
)
@click.option("--mongo-uri",required=False, default="mongodb://localhost:27017",)
@click.option(
"--is-direct-connection",
type=bool,
required=False,
default=True,
show_default=True,
help=f"Whether you want the script to set the `directConnection` flag when connecting to the MongoDB server. "
f"That is required by some MongoDB servers that belong to a replica set. ",
)
@click.option(
"--database-name",
type=str,
required=False,
default="nmdc",
show_default=True,
help=f"MongoDB database name",
)
@click.pass_context
def ingest_records(ctx, reid_records_file, changesheet_only):
def ingest_records(ctx, reid_records_file, changesheet_only, mongo_uri=None,
is_direct_connection=True, database_name="nmdc"):
"""
Read in json dump of re_id'd records and:
submit them to the
/v1/workflows/activities endpoint
- validate the records against the /metadata/json:validate endpoint
- insert the records into the MongoDB database defined by the --mongo-uri and --database-name options
"""
start_time = time.time()
logging.info(f"Submitting re id'd records from : {reid_records_file}")
Expand All @@ -394,15 +415,26 @@ def ingest_records(ctx, reid_records_file, changesheet_only):

# Get API client(s)
config = ctx.obj["site_config"]
api_client = NmdcRuntimeApi(config)
# api_client = NmdcRuntimeApi(config)
api_user_client = NmdcRuntimeUserApi(config)

# TODO - need to get mongo_uri credentials for the Napa DB instance in the config file. Meanwhile, we can test
# with a local MongoDB instance
logging.info(f"Using MongoDB URI: {mongo_uri}")

# Connect to the MongoDB server and check the database name
client = pymongo.MongoClient(mongo_uri, directConnection=is_direct_connection)
with pymongo.timeout(5):
assert (database_name in client.list_database_names()), f"Database {database_name} not found"
logging.info(f"Connected to MongoDB server at {mongo_uri}")
db_client = client[database_name]


with open(reid_records_file, "r") as f:
db_records = json.load(f)

changesheet = Changesheet(name=f"{reid_base_name}_changesheet")
for record in db_records:
time.sleep(3)
# remove the omics_processing_set and use it to generate
# changes to omics_processing has_output
omics_processing_set = record.pop("omics_processing_set")
Expand Down Expand Up @@ -438,14 +470,26 @@ def ingest_records(ctx, reid_records_file, changesheet_only):

# submit the record to the workflows endpoint
if not changesheet_only:
resp = api_client.post_objects(record)
logging.info(f"{record} posted, got response: {resp}")
# validate the record
if api_user_client.validate_record(record):
logging.info("DB Record validated")
# submit the record documents directly via the MongoDB client
for collection_name, collection in record.items():
# collection shouldn't be empty but check just in case
if not collection:
logging.warning(f"Empty collection: {collection_name}")
continue
logging.info(f"Inserting {len(collection)} records into {collection_name}")
insertion_result = db_client[collection_name].insert_many(collection)
logging.info(f"Inserted {len(insertion_result.inserted_ids)} records into {collection_name}")
else:
logging.error("Workflow Record validation failed")
else:
logging.info(f"changesheet_only is True, skipping ingest")
logging.info(f"changesheet_only is True, skipping Workflow and Data Object ingest")

changesheet.write_changesheet()
logging.info(f"changesheet written to {changesheet.output_filepath}")
if changesheet.validate_changesheet(api_client.config.napa_base_url):
if changesheet.validate_changesheet(api_user_client.base_url):
logging.info(f"changesheet validated")
else:
logging.info(f"changesheet validation failed")
Expand Down

0 comments on commit 5758c6c

Please sign in to comment.