Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

58 refactor re id tools ingest records to not use deprecated v1workflowsactivities endpoint #59

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions nmdc_automation/api/nmdcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,50 @@ def run_query(self, query: dict):
response = requests.post(url, json=query, headers=self.headers)
return response.json()

def validate_record(self, record: dict):
"""
Function to validate a record using the Microbiome Data API.

Parameters:
record (dict): The record to be validated.

Returns:
dict: The response from the API.
"""

self.ensure_token()
url = f"{self.base_url}metadata/json:validate"

response = requests.post(url, json=record, headers=self.headers)
# check for validation failure
if response.status_code == 422:
logging.error(f"Validation failed for record: {record}")
logging.error(response.json())
response.raise_for_status()
return response.json()

def submit_record(self, record: dict):
"""
Function to submit a record using the Microbiome Data API.

Parameters:
record (dict): The record to be submitted.

Returns:
dict: The response from the API.
"""

self.ensure_token()
url = f"{self.base_url}metadata/json:submit"

response = requests.post(url, json=record, headers=self.headers)
# check if we already have it:
if response.status_code == 409:
logging.debug(f"Record already exists: {response.json()}")
return
response.raise_for_status()
return response.json()

def jprint(obj):
print(json.dumps(obj, indent=2))

Expand Down
62 changes: 53 additions & 9 deletions nmdc_automation/re_iding/scripts/re_id_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import click
import requests
from linkml_runtime.dumpers import json_dumper
import pymongo

from nmdc_automation.api import NmdcRuntimeApi, NmdcRuntimeUserApi
from nmdc_automation.nmdc_common.client import NmdcApi
Expand Down Expand Up @@ -380,12 +381,32 @@ def process_records(ctx, study_id, data_dir, update_links=False):
is_flag=True,
default=False,
)
@click.option("--mongo-uri",required=False, default="mongodb://localhost:27017",)
@click.option(
"--is-direct-connection",
type=bool,
required=False,
default=True,
show_default=True,
help=f"Whether you want the script to set the `directConnection` flag when connecting to the MongoDB server. "
f"That is required by some MongoDB servers that belong to a replica set. ",
)
@click.option(
"--database-name",
type=str,
required=False,
default="nmdc",
show_default=True,
help=f"MongoDB database name",
)
@click.pass_context
def ingest_records(ctx, reid_records_file, changesheet_only):
def ingest_records(ctx, reid_records_file, changesheet_only, mongo_uri=None,
is_direct_connection=True, database_name="nmdc"):
"""
Read in json dump of re_id'd records and:
submit them to the
/v1/workflows/activities endpoint
- validate the records against the /metadata/json:validate endpoint
- insert the records into the MongoDB database defined by the --mongo-uri and --database-name options

"""
start_time = time.time()
logging.info(f"Submitting re id'd records from : {reid_records_file}")
Expand All @@ -394,15 +415,26 @@ def ingest_records(ctx, reid_records_file, changesheet_only):

# Get API client(s)
config = ctx.obj["site_config"]
api_client = NmdcRuntimeApi(config)
# api_client = NmdcRuntimeApi(config)
api_user_client = NmdcRuntimeUserApi(config)

# TODO - need to get mongo_uri credentials for the Napa DB instance in the config file. Meanwhile, we can test
# with a local MongoDB instance
logging.info(f"Using MongoDB URI: {mongo_uri}")

# Connect to the MongoDB server and check the database name
client = pymongo.MongoClient(mongo_uri, directConnection=is_direct_connection)
with pymongo.timeout(5):
assert (database_name in client.list_database_names()), f"Database {database_name} not found"
logging.info(f"Connected to MongoDB server at {mongo_uri}")
db_client = client[database_name]


with open(reid_records_file, "r") as f:
db_records = json.load(f)

changesheet = Changesheet(name=f"{reid_base_name}_changesheet")
for record in db_records:
time.sleep(3)
# remove the omics_processing_set and use it to generate
# changes to omics_processing has_output
omics_processing_set = record.pop("omics_processing_set")
Expand Down Expand Up @@ -438,14 +470,26 @@ def ingest_records(ctx, reid_records_file, changesheet_only):

# submit the record to the workflows endpoint
if not changesheet_only:
resp = api_client.post_objects(record)
logging.info(f"{record} posted, got response: {resp}")
# validate the record
if api_user_client.validate_record(record):
logging.info("DB Record validated")
# submit the record documents directly via the MongoDB client
for collection_name, collection in record.items():
# collection shouldn't be empty but check just in case
if not collection:
logging.warning(f"Empty collection: {collection_name}")
continue
logging.info(f"Inserting {len(collection)} records into {collection_name}")
insertion_result = db_client[collection_name].insert_many(collection)
logging.info(f"Inserted {len(insertion_result.inserted_ids)} records into {collection_name}")
else:
logging.error("Workflow Record validation failed")
else:
logging.info(f"changesheet_only is True, skipping ingest")
logging.info(f"changesheet_only is True, skipping Workflow and Data Object ingest")

changesheet.write_changesheet()
logging.info(f"changesheet written to {changesheet.output_filepath}")
if changesheet.validate_changesheet(api_client.config.napa_base_url):
if changesheet.validate_changesheet(api_user_client.base_url):
logging.info(f"changesheet validated")
else:
logging.info(f"changesheet validation failed")
Expand Down
Loading