Skip to content

Commit

Permalink
Crated cross_map service
Browse files Browse the repository at this point in the history
  • Loading branch information
Harald Wilhelmi committed Jun 12, 2024
1 parent 8d9c24e commit e1c122d
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 186 deletions.
2 changes: 1 addition & 1 deletion server/src/scimodom/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __enter__(self) -> Ctx:
else:
file_service = get_file_service()
try:
self._tmp_file_handle = file_service.open_tmp_file_by_id(
self._tmp_file_handle = file_service.open_tmp_upload_file_by_id(
self._upload_id
)
except FileNotFoundError:
Expand Down
2 changes: 1 addition & 1 deletion server/src/scimodom/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def get_valid_tmp_file_id_from_request_parameter(
if not VALID_FILENAME_REGEXP.match(raw_id):
raise ClientResponseException(400, f"Bad file ID in parameter {parameter}")
file_service = get_file_service()
if not file_service.check_tmp_file_id(raw_id):
if not file_service.check_tmp_upload_file_id(raw_id):
raise ClientResponseException(
404, "Temporary file not found - your upload may have expired"
)
Expand Down
56 changes: 31 additions & 25 deletions server/src/scimodom/services/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from scimodom.config import Config
from scimodom.database.models import Assembly, Taxa
import scimodom.database.queries as queries
from scimodom.utils.operations import liftover_to_file
from scimodom.services.cross_map import get_cross_map_service
import scimodom.utils.specifications as specs
import scimodom.utils.utils as utils

Expand Down Expand Up @@ -321,42 +321,48 @@ def create_new(self):
with open(Path(parent, "release.json"), "w") as f:
json.dump(release, f, indent="\t")

def liftover(self, records: list[tuple[Any, ...]], threshold: float = 0.3) -> str:
def liftover(self, raw_file: str, threshold: float = 0.3) -> str:
"""Liftover records to current assembly.
Unmapped features are discarded.
:param records: Records to be lifted over
:type records: List of tuple of (str, ...) - Data records
:param raw_file: BED file to be lifted over
:type records: str
:param threshold: Threshold for raising LiftOverError
:type threshold: float
:returns: Files pointing to the liftedOver features
:rtype: str
"""
parent, filen = self.get_chain_path()
chain_file = Path(parent, filen).as_posix()
lifted_file, unmapped_file = liftover_to_file(records, chain_file)

def _count_lines(reader):
b = reader(1024 * 1024)
while b:
yield b
b = reader(1024 * 1024)

with open(unmapped_file, "rb") as fh:
unmapped_lines = sum(
line.count(b"\n") for line in _count_lines(fh.raw.read)
)
failed_liftover = unmapped_lines / len(records) > threshold
if failed_liftover:
raise LiftOverError
msg = (
f"{unmapped_lines} records could not be mapped and were discarded... "
"Contact the system administrator if you have questions."

raw_lines = self._count_lines(raw_file)
chain_file_dir, chain_file_name = self.get_chain_path()
chain_file = Path(chain_file_dir, chain_file_name).as_posix()
cross_map_service = get_cross_map_service()
lifted_file, unmapped_file = cross_map_service.liftover_file(
raw_file, chain_file
)
logger.warning(msg)

unmapped_lines = self._count_lines(unmapped_file)
if unmapped_lines / raw_lines > threshold:
raise LiftOverError(
f"Lift-over failed: {unmapped_lines} records of {raw_lines} could not be mapped"
)
if unmapped_lines > 0:
logger.warning(
f"{unmapped_lines} records could not be mapped and were discarded... "
"Contact the system administrator if you have questions."
)
return lifted_file

@staticmethod
def _count_lines(path):
count = 0
with open(path) as fp:
while True:
buffer = fp.read(1024 * 1024)
if len(buffer) == 0:
return count
count += buffer.count("\n")

def _get_current_name(self) -> str:
"""Get current assembly name. This methods
allows to get the assembly name for the current
Expand Down
64 changes: 20 additions & 44 deletions server/src/scimodom/services/bedtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@
"""

from functools import cache
from os import makedirs
import logging
from pathlib import Path
import shlex
import subprocess
from typing import Iterable, Sequence, List
from typing import Iterable, Sequence, List, Any

import pybedtools # type: ignore
from pybedtools import BedTool, create_interval_from_list

import scimodom.utils.utils as utils
from scimodom.config import Config
from scimodom.services.file import FileService, get_file_service
from scimodom.utils.bedtools_dto import (
ModificationRecord,
DataAnnotationRecord,
Expand Down Expand Up @@ -90,9 +87,8 @@ def _remove_filno(feature, n_fields: int = 9, is_closest: bool = False):


class BedToolsService:
def __init__(self, tempdir):
makedirs(tempdir, exist_ok=True)
pybedtools.helpers.set_tempdir(tempdir)
def __init__(self, file_service: FileService):
self._file_service = file_service

def annotate_data_to_records(
self,
Expand Down Expand Up @@ -285,47 +281,27 @@ def get_annotation_records(
id=f"{prefix}{intergenic_feature}", annotation_id=annotation_id
)

def liftover_to_file(
self,
records: Iterable[ModificationRecord],
chain_file: str,
unmapped: str | None = None,
chrom_id: str = "s",
) -> tuple[str, str]:
def create_temp_file_from_records(
self, records: Iterable[Sequence[Any]], sort: bool = True
) -> str:
"""Liftover records. Handles conversion to BedTool, but not from,
of the liftedOver features. A file is returned pointing
to the liftedOver features. The unmapped ones are saved as
"unmapped", or discarded.
:param records: Data records to liftover
:type records: List of tuple of (str, ...) - Data records
:param chain_file: Chain file
:type chain_file: str
:param unmapped: File to write unmapped features
:type unmapped: str or None
:param chrom_id: The style of chromosome IDs (default s).
:type chrom_id: str
:returns: Files with liftedOver and unmapped features
:rtype: tuple of (str, str)
:param records: A iterable over records which can be processed by bedtools
:type records: Iterable[Sequence[Any]]
:param sort: sort the result
:returns: Path to temporary file
:rtype: str
"""
bedtool_records = self._get_modifications_as_bedtool(records)
result = pybedtools.BedTool._tmp() # noqa
if unmapped is None:
unmapped = pybedtools.BedTool._tmp() # noqa
cmd = f"CrossMap bed --chromid {chrom_id} --unmap-file {unmapped} {chain_file} {bedtool_records.fn} {result}"

logger.debug(f"Calling: {cmd}")

# set a timeout?
try:
subprocess.run(shlex.split(cmd), check=True, capture_output=True, text=True)
except FileNotFoundError:
raise Exception("Process failed: CrossMap executable could not be found!")
except subprocess.CalledProcessError as exc:
msg = f"Process failed with {exc.stderr}"
raise Exception(msg) from exc
# except subprocess.TimeoutExpired as exc:
return result, unmapped

path = self._file_service.create_temp_file(suffix=".bed")
bedtool = pybedtools.BedTool(records)
if sort:
bedtool = bedtool.sort()
bedtool.saveas(path)
return path

def intersect(
self,
Expand Down Expand Up @@ -490,4 +466,4 @@ def b_generator():

@cache
def get_bedtools_service():
return BedToolsService(tempdir=Config.BEDTOOLS_TMP_PATH)
return BedToolsService(file_service=get_file_service())
60 changes: 60 additions & 0 deletions server/src/scimodom/services/cross_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from functools import cache
import shlex
from subprocess import run, CalledProcessError

from scimodom.services.file import FileService, get_file_service


logger = logging.getLogger(__name__)


class CrossMapService:
def __init__(self, file_service: FileService):
self._file_service = file_service

def liftover_file(
self,
raw_file: str,
chain_file: str,
unmapped: str | None = None,
chrom_id: str = "s",
) -> tuple[str, str]:
"""Liftover records. Handles conversion to BedTool, but not from,
of the liftedOver features. A file is returned pointing
to the liftedOver features. The unmapped ones are saved as
"unmapped", or discarded.
:param raw_file: File name of o BED file to liftover
:type raw_file: str
:param chain_file: Chain file
:type chain_file: str
:param unmapped: File to write unmapped features
:type unmapped: str or None
:param chrom_id: The style of chromosome IDs (default s).
:type chrom_id: str
:returns: Files with liftedOver and unmapped features
:rtype: tuple of (str, str)
"""
result = self._file_service.create_temp_file(suffix=".bed")
if unmapped is None:
unmapped = self._file_service.create_temp_file(suffix=".bed")
cmd = f"CrossMap bed --chromid {chrom_id} --unmap-file {unmapped} {chain_file} {raw_file} {result}"
logger.debug(f"Calling: {cmd}")

# set a timeout?
try:
run(shlex.split(cmd), check=True, capture_output=True, text=True)
except FileNotFoundError as exc:
msg = "Process failed: CrossMap executable could not be found!"
raise Exception(msg) from exc
except CalledProcessError as exc:
msg = f"Process failed with {exc.stderr}"
raise Exception(msg) from exc
# except subprocess.TimeoutExpired as exc:
return result, unmapped


@cache
def get_cross_map_service():
return CrossMapService(file_service=get_file_service())
4 changes: 3 additions & 1 deletion server/src/scimodom/services/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ def create_dataset(self) -> None:
for record in records
]
try:
lifted_file = assembly_service.liftover(records)
bedtools_service = get_bedtools_service()
raw_file = bedtools_service.create_temp_file_from_records(records)
lifted_file = assembly_service.liftover(raw_file)
importer.reset_data_importer(lifted_file)
importer.data.parse_records()
importer.data.close() # flush
Expand Down
48 changes: 33 additions & 15 deletions server/src/scimodom/services/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ class FileService:
BUFFER_SIZE = 1024 * 1024
VALID_FILE_ID_REGEXP = re.compile(r"\A[a-zA-Z0-9_-]{1,256}\Z")

def __init__(self, session: Session):
def __init__(self, session: Session, temp_path: str, upload_path: str):
self._db_session = session
makedirs(temp_path, exist_ok=True)
makedirs(upload_path, exist_ok=True)
self._temp_path = temp_path
self._upload_path = upload_path

# general
# uploaded files

def upload_tmp_file(self, stream, max_file_size):
fp, path = mkstemp(dir=Config.UPLOAD_PATH)
fp, path = mkstemp(dir=self._upload_path)
close(fp)
file_id = basename(path)
if not self.VALID_FILE_ID_REGEXP.match(file_id):
Expand All @@ -43,17 +47,6 @@ def upload_tmp_file(self, stream, max_file_size):
self._stream_to_file(stream, path, max_file_size, overwrite_is_ok=True)
return file_id

def open_tmp_file_by_id(self, file_id):
if not self.VALID_FILE_ID_REGEXP.match(file_id):
raise ValueError("open_tmp_file_by_id called with bad file_id")
path = join(Config.UPLOAD_PATH, file_id)
return open(path)

@staticmethod
def check_tmp_file_id(file_id: str) -> bool:
path = join(Config.UPLOAD_PATH, file_id)
return isfile(path)

def _stream_to_file(self, data_stream, path, max_size, overwrite_is_ok=False):
if exists(path) and not overwrite_is_ok:
raise Exception(
Expand All @@ -78,6 +71,27 @@ def _stream_to_file(self, data_stream, path, max_size, overwrite_is_ok=False):
except Exception as exc:
self._handle_upload_error(exc, path)

def open_tmp_upload_file_by_id(self, file_id):
if not self.VALID_FILE_ID_REGEXP.match(file_id):
raise ValueError("open_tmp_file_by_id called with bad file_id")
path = join(Config.UPLOAD_PATH, file_id)
return open(path)

@staticmethod
def check_tmp_upload_file_id(file_id: str) -> bool:
path = join(Config.UPLOAD_PATH, file_id)
return isfile(path)

# intermediate files

def create_temp_file(self, suffix="") -> str:
fp, path = mkstemp(dir=self._temp_path, suffix=suffix)
close(fp)
return path

def get_temp_path(self) -> str:
return self._temp_path

# BAM file

def create_or_update_bam_file(
Expand Down Expand Up @@ -182,4 +196,8 @@ def remove_bam_file(self, bam_file: BamFile):

@cache
def get_file_service() -> FileService:
return FileService(get_session())
return FileService(
get_session(),
temp_path=Config.BEDTOOLS_TMP_PATH,
upload_path=Config.UPLOAD_PATH,
)
Loading

0 comments on commit e1c122d

Please sign in to comment.