Skip to content

Commit

Permalink
WIP data service
Browse files Browse the repository at this point in the history
  • Loading branch information
eboileau committed Feb 15, 2024
1 parent f349dd2 commit 0ef7a7e
Show file tree
Hide file tree
Showing 4 changed files with 426 additions and 280 deletions.
219 changes: 117 additions & 102 deletions server/src/scimodom/services/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from pathlib import Path
from posixpath import join as urljoin
from typing import ClassVar
from typing import ClassVar, Callable

import requests # type: ignore
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -42,11 +42,18 @@ class AssemblyService:
:type DATA_PATH: str | Path | None
:param DATA_SUB_PATH: Subpath to assembly
:type DATA_SUB_PATH: str
:param CHROM_fILE: File name with a list of
allowed chromosomes
:type CHROM_FILE: str
:param CHAIN_FILE: Chain file template name
:type CHAIN_FILE: str
"""

ASSEMBLY_NUM_LENGTH: ClassVar[int] = specs.ASSEMBLY_NUM_LENGTH
DATA_PATH: ClassVar[str | Path] = Config.DATA_PATH
DATA_SUB_PATH: ClassVar[str] = "assembly"
CHROM_FILE: ClassVar[str] = "chrom.sizes"
CHAIN_FILE: ClassVar[Callable] = "{source}_to_{target}.chain.gz".format

def __init__(self, session: Session, **kwargs) -> None:
"""Initializer method."""
Expand All @@ -59,6 +66,7 @@ def __init__(self, session: Session, **kwargs) -> None:
self._taxid: int
self._chrom_file: Path

# current DB assembly version
query = queries.get_assembly_version()
self._db_version = self._session.execute(query).scalar()

Expand All @@ -81,14 +89,11 @@ def __init__(self, session: Session, **kwargs) -> None:
raise AssemblyVersionError(msg)
self._name = records["name"]
self._taxid = records["taxa_id"]
self._set_chrom_path()
else:
self._name = kwargs.get("name", None)
self._taxid = kwargs.get("taxa_id", None)
if not isinstance(self._name, str):
raise TypeError(f"Expected str; got {type(self._name).__name__}")
if not isinstance(self._taxid, int):
raise TypeError(f"Expected int; got {type(self._taxid).__name__}")
# difficult to validate name, or combination of (name, taxa_id)...
# use select(func.dictinct(Organism.taxa_id)) for available IDs
query = select(Taxa.id)
Expand All @@ -101,7 +106,6 @@ def __init__(self, session: Session, **kwargs) -> None:
assembly_id = self._session.execute(query).scalar_one_or_none()
if assembly_id:
self._assembly_id = assembly_id
self._set_chrom_path()
else:
query = select(func.distinct(Assembly.version))
version_nums = self._session.execute(query).scalars().all()
Expand All @@ -116,6 +120,9 @@ def __init__(self, session: Session, **kwargs) -> None:
self._assembly_id = assembly.id
self._is_new = True

# chrom file for the current DB assembly version
self._set_chrom_path()

def __new__(cls, session: Session, **kwargs):
"""Constructor method."""
if cls.DATA_PATH is None:
Expand All @@ -133,8 +140,8 @@ def from_id(cls, session: Session, assembly_id: int):
:param session: SQLAlchemy ORM session
:type session: Session
:param id: Assembly ID
:type id: int
:param assembly_id: Assembly ID
:type assembly_id: int
:returns: AssemblyService class instance
:rtype: AssemblyService
"""
Expand All @@ -152,7 +159,7 @@ def from_new(cls, session: Session, name: str, taxa_id: int):
:param name: Assembly name
:type name: str
:param taxa_id: Assembly taxa_id
:type name: int
:type taxa_id: int
:returns: AssemblyService class instance
:rtype: AssemblyService
"""
Expand All @@ -163,113 +170,49 @@ def from_new(cls, session: Session, name: str, taxa_id: int):
service._new()
return service

def _get_organism(self):
"""Query organism from taxonomy ID"""
query = queries.query_column_where(
"Taxa",
"name",
filters={"id": self._taxid},
)
organism = self._session.execute(query).scalar_one()
organism = "_".join(organism.split())
return organism

def get_chain_path(self) -> tuple[Path, str]:
"""Construct file path (chain file) for organism.
Only to (not from) current version.
@staticmethod
def get_assembly_path() -> Path:
"""Construct parent path to assembly files.
:returns: Parent and file name
:rtype: tuple[str | Path, str]
:returns: Path to assembly
:rtype: Path
"""
organism = self._get_organism()
parent = Path(self.DATA_PATH, self.DATA_SUB_PATH, organism, self._name)
# assembly_id may not be the current assembly version
query = queries.query_column_where(
"Assembly",
"name",
filters={"taxa_id": self._taxid, "version": self._db_version},
)
name = self._session.execute(query).scalar_one()
filen = f"{self._name}_to_{name}.chain.gz"
return parent, filen
return Path(AssemblyService.DATA_PATH, AssemblyService.DATA_SUB_PATH)

@staticmethod
def get_chrom_path(
path: str | Path | None, organism: str, assembly: str
) -> tuple[Path, str]:
"""Construct file path (chrom sizes) for organism
:param path: Base path
:type path: str | Path
:param organism: Organism
def get_chrom_path(organism: str, assembly: str) -> tuple[Path, str]:
"""Construct file path (chrom sizes) for a
given organism and assemly. Since there is only
one file per organism, for the current Assembly
version, assembly must also match the current
Assembly, but this method cannot check this.
This method attempst to format organism.
:param organism: Organism name
:type organism: str
:param assembly: Assembly
:param assembly: Assembly name
:type assembly: str
:returns: Parent and file name
:rtype: tuple[str | Path, str]
"""
if path is None:
raise TypeError("Missing DATA PATH! Cannot construct path to chrom.sizes!")
organism = "_".join(organism.lower().split()).capitalize()
path = AssemblyService.get_assembly_path()
parent = Path(path, organism, assembly)
return parent, "chrom.sizes"

def _set_chrom_path(self, path: bool = False) -> Path | None:
"""Assign chrom.size path
return parent, AssemblyService.CHROM_FILE

:param path: Return parent path or not
:type path: bool
:returns: parent
:rtype: Path
"""
organism = self._get_organism()
parent, filen = self.get_chrom_path(
Path(self.DATA_PATH, self.DATA_SUB_PATH), organism, self._name
)
self._chrom_file = Path(parent, filen)
if path:
return parent
else:
return None
def get_chain_path(self) -> tuple[Path, str]:
"""Construct file path (chain file) for organism.
Only to (not from) current version.
def _new(self):
"""Prepare new assembly, i.e. get chain files, but this
does not create an assembly for the current DB assembly version,
see create_new().
:returns: Parent and file name
:rtype: tuple[str | Path, str]
"""

msg = "Setting directories up for new assembly..."
logger.debug(msg)

parent = self._set_chrom_path(path=True)
try:
parent.mkdir(parents=True, exist_ok=False)
except FileExistsError as error:
msg = (
f"Assembly directory at {parent} already exists despite calling AssemblyService "
"from new... Aborting transaction!"
)
raise Exception(msg) from error

msg = "Downloading chain files..."
logger.debug(msg)

organism = self._get_organism()
parent, filen = self.get_chain_path()
chain_file = Path(parent, filen)

url = urljoin(
specs.ENSEMBL_FTP, specs.ENSEMBL_ASM_MAPPING, organism.lower(), filen
)
with requests.get(url, stream=True) as request:
if not request.ok:
request.raise_for_status()
try:
with open(chain_file, "wb") as f:
for chunk in request.iter_content(chunk_size=10 * 1024):
f.write(chunk)
except FileExistsError:
msg = f"File at {chain_file} exists. Skipping!"
logger.warning(msg)
parent = Path(self.get_assembly_path(), organism, self._name)
# assembly_id may not be the current assembly version
name = self._get_current_name()
filen = self.CHAIN_FILE(source=self._name, target=name)
return parent, filen

def create_new(self):
"""Creates a new assembly for current DB version, mostly for
Expand All @@ -289,7 +232,8 @@ def create_new(self):
)
raise AssemblyVersionError(msg)

parent = self._set_chrom_path(path=True)
self._set_chrom_path()
parent = self._chrom_file.parent
try:
parent.mkdir(parents=True, exist_ok=False)
except FileExistsError as error:
Expand Down Expand Up @@ -346,3 +290,74 @@ def create_new(self):
release = request.json()
with open(Path(parent, "release.json"), "w") as f:
json.dump(release, f, indent="\t")

def _get_current_name(self) -> str:
"""Get current assembly name. This methods
allows to get the assembly name for the current
version, e.g. if the class has been instantiated
with a different assembly name/version."""
query = queries.query_column_where(
"Assembly",
"name",
filters={"taxa_id": self._taxid, "version": self._db_version},
)
return self._session.execute(query).scalar_one()

def _get_organism(self):
"""Query organism from taxonomy ID"""
query = queries.query_column_where(
"Taxa",
"name",
filters={"id": self._taxid},
)
organism = self._session.execute(query).scalar_one()
organism = "_".join(organism.split())
return organism

def _set_chrom_path(self) -> None:
"""Assign chrom file path. This is the same
irrespective of assembly."""
organism = self._get_organism()
name = self._get_current_name()
parent, filen = self.get_chrom_path(organism, name)
self._chrom_file = Path(parent, filen)

def _new(self):
"""Prepare new assembly, i.e. get chain files, but this
does not create an assembly for the current DB assembly version,
see create_new().
"""

msg = "Setting directories up for new assembly..."
logger.debug(msg)

organism = self._get_organism()
parent, _ = self.get_chrom_path(organism, self._name)
try:
parent.mkdir(parents=True, exist_ok=False)
except FileExistsError as error:
msg = (
f"Assembly directory at {parent} already exists despite calling AssemblyService "
"from new... Aborting transaction!"
)
raise Exception(msg) from error

msg = "Downloading chain files..."
logger.debug(msg)

parent, filen = self.get_chain_path()
chain_file = Path(parent, filen)

url = urljoin(
specs.ENSEMBL_FTP, specs.ENSEMBL_ASM_MAPPING, organism.lower(), filen
)
with requests.get(url, stream=True) as request:
if not request.ok:
request.raise_for_status()
try:
with open(chain_file, "wb") as f:
for chunk in request.iter_content(chunk_size=10 * 1024):
f.write(chunk)
except FileExistsError:
msg = f"File at {chain_file} exists. Skipping!"
logger.warning(msg)
Loading

0 comments on commit 0ef7a7e

Please sign in to comment.