Skip to content

Commit

Permalink
Merge pull request #254 from juaml/feat/workdirmanager
Browse files Browse the repository at this point in the history
[ENH]: Introduce `WorkDirManager`
  • Loading branch information
synchon authored Oct 13, 2023
2 parents a509a23 + d2f870b commit 3fd1877
Show file tree
Hide file tree
Showing 15 changed files with 535 additions and 180 deletions.
1 change: 1 addition & 0 deletions docs/changes/newsfragments/254.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce :class:`junifer.pipeline.WorkDirManager` singleton class to manage working and temporary directories across pipeline by `Synchon Mandal`_
3 changes: 3 additions & 0 deletions junifer/api/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..datagrabber.base import BaseDataGrabber
from ..markers.base import BaseMarker
from ..markers.collection import MarkerCollection
from ..pipeline import WorkDirManager
from ..pipeline.registry import build
from ..preprocess.base import BasePreprocessor
from ..storage.base import BaseFeatureStorage
Expand Down Expand Up @@ -115,6 +116,8 @@ def run(
# Convert str to Path
if isinstance(workdir, str):
workdir = Path(workdir)
# Initiate working directory manager
WorkDirManager(workdir)

if not isinstance(elements, list) and elements is not None:
elements = [elements]
Expand Down
29 changes: 18 additions & 11 deletions junifer/markers/falff/falff_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@
# Federico Raimondo <[email protected]>
# License: AGPL

import shutil
import subprocess
import tempfile
import typing
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

import nibabel as nib
import numpy as np
from nilearn import image as nimg
from scipy.fft import fft, fftfreq

from ...pipeline import WorkDirManager
from ...pipeline.singleton import singleton
from ...utils import logger, raise_error
from ..utils import singleton


if TYPE_CHECKING:
Expand All @@ -41,19 +39,24 @@ class ALFFEstimator:
use_afni : bool
Whether to use afni for computation. If False, will use python.
Attributes
----------
temp_dir_path : pathlib.Path
Path to the temporary directory for assets storage.
"""

def __init__(self) -> None:
self._file_path = None
# Create temporary directory for intermittent storage of assets during
# computation via afni's 3dReHo
self.temp_dir_path = Path(tempfile.mkdtemp())
# computation via afni's 3dRSFC
self.temp_dir_path = None

def __del__(self) -> None:
"""Cleanup."""
print("Cleaning up temporary directory...")
# Delete temporary directory and ignore errors for read-only files
shutil.rmtree(self.temp_dir_path, ignore_errors=True)
if self.temp_dir_path is not None:
WorkDirManager().delete_tempdir(self.temp_dir_path)

@staticmethod
def _run_afni_cmd(cmd: str) -> None:
Expand Down Expand Up @@ -123,6 +126,8 @@ def _compute_alff_afni(
If the AFNI commands fails due to some issues
"""
# Note: self.temp_dir_path is sure to exist before proceeding, so
# types checks are ignored further on.

# Save niimg to nii.gz
nifti_in_file_path = self.temp_dir_path / "input.nii"
Expand Down Expand Up @@ -162,7 +167,7 @@ def _compute_alff_afni(
self._run_afni_cmd(convert_cmd)

# Cleanup intermediate files
for fname in self.temp_dir_path.glob("temp_*"):
for fname in self.temp_dir_path.glob("temp_*"): # type: ignore
fname.unlink()

# Load niftis
Expand Down Expand Up @@ -268,6 +273,8 @@ def _compute(
fALFF map.
"""
if use_afni:
# Create new temporary directory before using AFNI
self.temp_dir_path = WorkDirManager().get_tempdir(prefix="falff")
output = self._compute_alff_afni(
data=data,
highpass=highpass,
Expand Down Expand Up @@ -318,8 +325,8 @@ def fit_transform(
# Clear the cache
self._compute.cache_clear()
# Clear temporary directory files
for file_ in self.temp_dir_path.iterdir():
file_.unlink(missing_ok=True)
if self.temp_dir_path is not None:
WorkDirManager().delete_tempdir(self.temp_dir_path)
# Set the new file path
self._file_path = bold_path
else:
Expand Down
135 changes: 80 additions & 55 deletions junifer/markers/falff/tests/test_falff_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,44 @@
# License: AGPL

import time
from pathlib import Path

import pytest
from nibabel import Nifti1Image
from scipy.stats import pearsonr

from junifer.datareader import DefaultDataReader
from junifer.markers.falff.falff_estimator import ALFFEstimator
from junifer.pipeline import WorkDirManager
from junifer.pipeline.utils import _check_afni
from junifer.testing.datagrabbers import PartlyCloudyTestingDataGrabber
from junifer.utils import logger


def test_ALFFEstimator_cache_python() -> None:
"""Test that the cache works properly when using python."""
with PartlyCloudyTestingDataGrabber() as dg:
input = dg["sub-01"]
def test_ALFFEstimator_cache_python(tmp_path: Path) -> None:
"""Test that the cache works properly when using python.
input = DefaultDataReader().fit_transform(input)
Parameters
----------
tmp_path : pathlib.Path
The path to the test directory.
"""
# Get subject from datagrabber
with PartlyCloudyTestingDataGrabber() as dg:
subject = dg["sub-01"]
# Read data for subject
subject_data = DefaultDataReader().fit_transform(subject)
# Update workdir to current test's tmp_path
WorkDirManager().workdir = tmp_path
# Setup estimator
estimator = ALFFEstimator()

# Compute without cache
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=False,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
Expand All @@ -36,91 +50,92 @@ def test_ALFFEstimator_cache_python() -> None:
logger.info(f"ALFF Estimator First time: {first_time}")
assert isinstance(alff, Nifti1Image)
assert isinstance(falff, Nifti1Image)
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 0 # no files in python

# Now fit again, should be faster
# Compute again with cache, should be faster
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=False,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
)
second_time = time.time() - start_time
logger.info(f"ALFF Estimator Second time: {second_time}")
assert second_time < (first_time / 1000)
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 0 # no files in python

# Now change a parameter, should compute again, without clearing the
# cache
# Change a parameter and compute again without cache
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=False,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.11,
tr=None,
)
third_time = time.time() - start_time
logger.info(f"ALFF Estimator Third time: {third_time}")
assert third_time > (first_time / 10)
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 0 # no files in python

# Now fit again with the previous params, should be fast
# Compute again with cache, should be faster
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=False,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
)
fourth = time.time() - start_time
logger.info(f"ALFF Estimator Fourth time: {fourth}")
assert fourth < (first_time / 1000)
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 0 # no files in python

# Now change the data, it should clear the cache
# Change the data and it should clear the cache
with PartlyCloudyTestingDataGrabber() as dg:
input = dg["sub-02"]

input = DefaultDataReader().fit_transform(input)
subject = dg["sub-02"]
# Read data for new subject
subject_data = DefaultDataReader().fit_transform(subject)

start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=False,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
)
fifth = time.time() - start_time
logger.info(f"ALFF Estimator Fifth time: {fifth}")
assert fifth > (first_time / 10)
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 0 # no files in python


@pytest.mark.skipif(
_check_afni() is False, reason="requires afni to be in PATH"
)
def test_ALFFEstimator_cache_afni() -> None:
"""Test that the cache works properly when using afni."""
with PartlyCloudyTestingDataGrabber() as dg:
input = dg["sub-01"]
def test_ALFFEstimator_cache_afni(tmp_path: Path) -> None:
"""Test that the cache works properly when using afni.
input = DefaultDataReader().fit_transform(input)
Parameters
----------
tmp_path : pathlib.Path
The path to the test directory.
"""
# Get subject from datagrabber
with PartlyCloudyTestingDataGrabber() as dg:
subject = dg["sub-01"]
# Read data for subject
subject_data = DefaultDataReader().fit_transform(subject)
# Update workdir to current test's tmp_path
WorkDirManager().workdir = tmp_path
# Setup estimator
estimator = ALFFEstimator()

# Compute with cache
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=True,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
Expand All @@ -132,11 +147,11 @@ def test_ALFFEstimator_cache_afni() -> None:
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 3 # input + alff + falff

# Now fit again, should be faster
# Compute again with cache, should be faster
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=True,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
Expand All @@ -147,12 +162,11 @@ def test_ALFFEstimator_cache_afni() -> None:
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 3 # input + alff + falff

# Now change a parameter, should compute again, without clearing the
# cache
# Change a parameter and compute again without cache
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=True,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.11,
tr=None,
Expand All @@ -161,13 +175,13 @@ def test_ALFFEstimator_cache_afni() -> None:
logger.info(f"ALFF Estimator Third time: {third_time}")
assert third_time > (first_time / 10)
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 5 # input + 2 * alff + 2 * falff
assert n_files == 3 # input + alff + falff

# Now fit again with the previous params, should be fast
# Compute with cache, should be faster
start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=True,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
Expand All @@ -176,18 +190,18 @@ def test_ALFFEstimator_cache_afni() -> None:
logger.info(f"ALFF Estimator Fourth time: {fourth}")
assert fourth < (first_time / 1000)
n_files = len(list(estimator.temp_dir_path.glob("*")))
assert n_files == 5 # input + 2 * alff + 2 * falff
assert n_files == 3 # input + alff + falff

# Now change the data, it should clear the cache
# Change the data and it should clear the cache
with PartlyCloudyTestingDataGrabber() as dg:
input = dg["sub-02"]

input = DefaultDataReader().fit_transform(input)
subject = dg["sub-02"]
# Read data for new subject
subject_data = DefaultDataReader().fit_transform(subject)

start_time = time.time()
alff, falff = estimator.fit_transform(
use_afni=True,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=None,
Expand All @@ -202,26 +216,37 @@ def test_ALFFEstimator_cache_afni() -> None:
@pytest.mark.skipif(
_check_afni() is False, reason="requires afni to be in PATH"
)
def test_ALFFEstimator_afni_vs_python() -> None:
"""Test that the cache works properly when using afni."""
with PartlyCloudyTestingDataGrabber() as dg:
input = dg["sub-01"]
def test_ALFFEstimator_afni_vs_python(tmp_path: Path) -> None:
"""Test that the cache works properly when using afni.
input = DefaultDataReader().fit_transform(input)
Parameters
----------
tmp_path : pathlib.Path
The path to the test directory.
"""
# Get subject from datagrabber
with PartlyCloudyTestingDataGrabber() as dg:
subject = dg["sub-01"]
# Read data for subject
subject_data = DefaultDataReader().fit_transform(subject)
# Update workdir to current test's tmp_path
WorkDirManager().workdir = tmp_path
# Setup estimator
estimator = ALFFEstimator()

# Use an arbitrary TR to test the AFNI vs Python implementation
afni_alff, afni_falff = estimator.fit_transform(
use_afni=True,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=2.5,
)

python_alff, python_falff = estimator.fit_transform(
use_afni=False,
input_data=input["BOLD"],
input_data=subject_data["BOLD"],
highpass=0.01,
lowpass=0.1,
tr=2.5,
Expand Down
Loading

0 comments on commit 3fd1877

Please sign in to comment.