Skip to content

Commit

Permalink
handle missing type and make logging a bit more consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Jan 30, 2024
1 parent c148ed2 commit f943ba6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
7 changes: 6 additions & 1 deletion nmdc_automation/re_iding/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ def update_metagenome_assembly_set(self, db_record: Dict,
if not data_object_type:
logger.warning(f"Data object type not found for {old_do_id} - {old_do_rec['description']}")
continue
old_url = old_do_rec.get("url")
if not old_url:
logger.warning(f"Data object url not found for {old_do_id} - {old_do_rec['description']}")
old_url = f"{DATA_BASE_URL}/{omics_processing_id}/assembly/{old_do_rec['name']}"
logger.warning(f"Using inferred url: {old_url}")
new_file_path = compute_new_paths_and_link(old_do_rec["url"], new_assembly_base_dir, new_activity_id)
updated_md5, updated_file_size = assembly_file_operations(
old_do_rec, data_object_type, new_file_path, new_activity_id,
Expand Down Expand Up @@ -477,7 +482,7 @@ def make_new_data_object(self, omics_processing_id: str,
description=new_description,
type="nmdc:DataObject",
file_size_bytes=data_object_record["file_size_bytes"],
md5_checksum=data_object_record["md5_checksum"],
md5_checksum=data_object_record.get("md5_checksum"),
url=new_url,
data_object_type=data_object_type,
)
Expand Down
57 changes: 28 additions & 29 deletions nmdc_automation/re_iding/scripts/re_id_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

logging.basicConfig(
filename="re_id.log",
filemode="w",
filemode="a",
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
Expand Down Expand Up @@ -221,9 +221,9 @@ def process_records(ctx, dryrun, study_id, data_dir):
Write the results to a new JSON file of nmdc Database instances.
"""
start_time = time.time()
logging.info(f"Processing workflow records for study_id: {study_id}")
logger.info(f"Processing workflow records for study_id: {study_id}")
if dryrun:
logging.info("Running in dryrun mode")
logger.info("Running in dryrun mode")

# Get API client
config = ctx.obj["site_config"]
Expand All @@ -232,21 +232,21 @@ def process_records(ctx, dryrun, study_id, data_dir):
# Get Database dump file paths and the data directory
db_infile, db_outfile = _get_database_paths(study_id, dryrun)
data_dir = _get_data_dir(data_dir, dryrun)
logging.info(f"Using data_dir: {data_dir}")
logger.info(f"Using data_dir: {data_dir}")

# Initialize re-ID tool
reid_tool = ReIdTool(api_client, data_dir)

# Read extracted DB records
logging.info(f"Using db_infile: {db_infile}")
logger.info(f"Using db_infile: {db_infile}")
with open(db_infile, "r") as f:
db_records = json.load(f)
logging.info(f"Read {len(db_records)} records from db_infile")
logger.info(f"Read {len(db_records)} records from db_infile")

re_ided_db_records = []
for db_record in db_records:
omics_processing_id = get_omics_processing_id(db_record)
logging.info(f"omics_processing_id: {omics_processing_id}")
logger.info(f"omics_processing_id: {omics_processing_id}")

new_db = nmdc.Database()
# update OmicsProcessing has_output and related DataObject records
Expand Down Expand Up @@ -289,7 +289,7 @@ def ingest_records(ctx, reid_records_file, changesheet_only):
/v1/workflows/activities endpoint
"""
start_time = time.time()
logging.info(f"Submitting re id'd records from : {reid_records_file}")
logger.info(f"Submitting re id'd records from : {reid_records_file}")
reid_records_filename = Path(reid_records_file).name
reid_base_name = reid_records_filename.split("_")[0]

Expand All @@ -309,7 +309,7 @@ def ingest_records(ctx, reid_records_file, changesheet_only):
omics_processing_set = record.pop("omics_processing_set")
for omics_processing_record in omics_processing_set:
omics_processing_id = omics_processing_record["id"]
logging.info(f"omics_processing_id: {omics_processing_id}")
logger.info(f"omics_processing_id: {omics_processing_id}")
# find legacy has_output and create change to remove it
# need to strip the nmdc: prefix for the objects endpoint
trimmed_omics_processing_id = omics_processing_id.split(":")[1]
Expand All @@ -325,7 +325,7 @@ def ingest_records(ctx, reid_records_file, changesheet_only):
value="|".join(legacy_omics_processing_record["has_output"]) + "|",
)
changesheet.line_items.append(change)
logging.info(f"changes: {change}")
logger.info(f"changes: {change}")

# insert new has_output
change = ChangesheetLineItem(
Expand All @@ -335,7 +335,7 @@ def ingest_records(ctx, reid_records_file, changesheet_only):
value="|".join(omics_processing_record["has_output"]) + "|",
)
changesheet.line_items.append(change)
logging.info(f"changes: {change}")
logger.info(f"changes: {change}")

# submit the record to the workflows endpoint
if not changesheet_only:
Expand All @@ -345,11 +345,11 @@ def ingest_records(ctx, reid_records_file, changesheet_only):
logger.info(f"changesheet_only is True, skipping ingest")

changesheet.write_changesheet()
logging.info(f"changesheet written to {changesheet.output_filepath}")
logger.info(f"changesheet written to {changesheet.output_filepath}")
if changesheet.validate_changesheet(api_client.config.napa_base_url):
logging.info(f"changesheet validated")
logger.info(f"changesheet validated")
else:
logging.info(f"changesheet validation failed")
logger.info(f"changesheet validation failed")


@cli.command()
Expand All @@ -361,14 +361,15 @@ def delete_old_records(ctx, old_records_file):
delete them using
/queries/run endpoint
"""

logging.info(f"Deleting old objects found in : {old_records_file}")
start_time = time.time()
logger.info(f"Deleting old objects found in : {old_records_file}")
old_records_filename = Path(old_records_file).name
old_base_name = old_records_filename.split("_")[0]

# Get API client(s)
config = ctx.obj["site_config"]
api_user_client = NmdcRuntimeUserApi(config)
logger.info(f"Using: {api_user_client.base_url}")

# get old db records
with open(old_records_file, "r") as f:
Expand All @@ -390,25 +391,22 @@ def delete_old_records(ctx, old_records_file):
"deletes": [{"q": {"id": item["id"]}, "limit": 1}],
}
try:
logging.info(
f"Running query: {delete_query}, deleting {set_name} with id: {item['id']}"
)

logger.info(f"Deleting {item.get('type')} record: {item['id']}")
run_query_response = api_user_client.run_query(
delete_query
)

logging.info(
logger.info(
f"Deleting query posted with response: {run_query_response}"
)
except requests.exceptions.RequestException as e:
logging.info(
logger.info(
f"An error occured while running: {delete_query}, response retutrned: {e}"
)

for annotation_id in gene_id_list:
try:
logging.info(
logger.info(
f"Deleting functional aggregate record with id: {annotation_id}"
)
delete_query_agg = {
Expand All @@ -418,25 +416,26 @@ def delete_old_records(ctx, old_records_file):

run_query_agg_response = api_user_client.run_query(delete_query_agg)

logging.info(
logger.info(
f"Response for deleting functional annotation agg record returned: {run_query_agg_response}"
)
except requests.exceptions.RequestException as e:
logging.error(
logger.error(
f"An error occurred while deleting annotation id {annotation_id}: {e}"
)
logger.info(f"Elapsed time: {time.time() - start_time}")


def _get_data_dir(data_dir, dryrun):
"""
Return the path to the data object files
"""
if dryrun:
logging.info("Running in dryrun mode")
logger.info("Running in dryrun mode")
return DRYRUN_DATAFILE_DIR
elif not data_dir:
data_dir = BASE_DATAFILE_DIR
logging.info(f"Using datafile_dir: {data_dir}")
logger.info(f"Using datafile_dir: {data_dir}")
return data_dir


Expand Down Expand Up @@ -466,12 +465,12 @@ def _get_legacy_id(omics_processing_record: dict) -> str:
alternative_ids = omics_processing_record.get("alternative_identifiers", [])
legacy_ids.extend(alternative_ids)
if len(legacy_ids) == 0:
logging.warning(
logger.warning(
f"No legacy IDs found for: {omics_processing_record['id']} using ID instead"
)
return omics_processing_record["id"]
elif len(legacy_ids) > 1:
logging.warning(
logger.warning(
f"Multiple legacy IDs found for omics_processing_record: {omics_processing_record['id']}"
)
return None
Expand Down

0 comments on commit f943ba6

Please sign in to comment.