Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,13 @@ def rsync_result(update: RSyncerUpdate):
session_id=self._environment.murfey_session,
data=rsyncer_data,
)
self._environment.watchers[source] = DirWatcher(source, settling_time=30)
self._environment.watchers[source] = DirWatcher(
source,
settling_time=30,
substrings_blacklist=self._machine_config.get(
"substrings_blacklist", {"directories": [], "files": []}
),
)

if not self.analysers.get(source) and analyse:
log.info(f"Starting analyser for {source}")
Expand Down
26 changes: 21 additions & 5 deletions src/murfey/client/watchdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
path: str | os.PathLike,
settling_time: float = 60,
appearance_time: float | None = None,
substrings_blacklist: dict[str, dict] = {},
transfer_all: bool = True,
status_bar: StatusBar | None = None,
):
Expand All @@ -42,6 +43,7 @@ def __init__(
self._statusbar = status_bar
self.settling_time = settling_time
self._appearance_time = appearance_time
self._substrings_blacklist = substrings_blacklist
self._transfer_all = transfer_all
self._modification_overwrite: float | None = None
self._init_time: float = time.time()
Expand Down Expand Up @@ -128,7 +130,7 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals
settling_time=scan_completion
)

# Create a list of files sroted based on their timestamps
# Create a list of files sorted based on their timestamps
files_for_transfer = []
time_ordered_file_candidates = sorted(
self._file_candidates,
Expand All @@ -150,8 +152,9 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals
continue

if (
self._file_candidates[x].settling_time + self.settling_time # type: ignore
< time.time()
current_file_settling_time := self._file_candidates[x].settling_time
) is not None and (
current_file_settling_time + self.settling_time < time.time()
):
try:
file_stat = os.stat(x)
Expand Down Expand Up @@ -252,15 +255,28 @@ def _scan_directory(
raise
for entry in directory_contents:
entry_name = os.path.join(path, entry.name)
if entry.is_dir() and (
# Skip any directories with matching blacklisted substrings
if entry.is_dir() and any(
char in entry.name
for char in self._substrings_blacklist.get("directories", [])
):
log.debug(f"Skipping blacklisted directory {str(entry.name)!r}")
continue
elif entry.is_dir() and (
modification_time is None or entry.stat().st_ctime >= modification_time
):
result.update(self._scan_directory(entry_name))
else:
# Exclude textual log
if "textual" in str(entry):
continue

# Exclude files with blacklisted substrings
if any(
char in entry.name
for char in self._substrings_blacklist.get("files", [])
):
log.debug(f"Skipping blacklisted file {str(entry.name)!r}")
continue
# Get file statistics and append file to dictionary
try:
file_stat = entry.stat()
Expand Down
4 changes: 4 additions & 0 deletions src/murfey/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class MachineConfig(BaseModel): # type: ignore
analyse_created_directories: list[str] = []
gain_reference_directory: Optional[Path] = None
eer_fractionation_file_template: str = ""
substrings_blacklist: dict[str, list] = {
"directories": [],
"files": [],
}

# Data transfer setup -------------------------------------------------------------
# Rsync setup
Expand Down
8 changes: 6 additions & 2 deletions src/murfey/util/processing_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ def cryolo_model_path(visit: str, instrument_name: str) -> Path:
return machine_config.default_model


class CLEMAlignAndMergeParameters(BaseModel):
class CLEMProcessingParameters(BaseModel):
# Atlas vs GridSquare registration threshold
atlas_threshold: float = 0.0015 # in m

# Image alignment and merging-specific parameters
crop_to_n_frames: Optional[int] = 50
align_self: Literal["enabled", ""] = "enabled"
flatten: Literal["mean", "min", "max", ""] = "mean"
align_across: Literal["enabled", ""] = "enabled"


default_clem_align_and_merge_parameters = CLEMAlignAndMergeParameters()
default_clem_processing_parameters = CLEMProcessingParameters()


class SPAParameters(BaseModel):
Expand Down
18 changes: 12 additions & 6 deletions src/murfey/workflows/clem/register_preprocessing_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from murfey.server import _transport_object
from murfey.util.models import GridSquareParameters
from murfey.util.processing_params import (
default_clem_align_and_merge_parameters as processing_params,
default_clem_processing_parameters as processing_params,
)
from murfey.workflows.clem import get_db_entry
from murfey.workflows.clem.align_and_merge import submit_cluster_request
Expand Down Expand Up @@ -51,6 +51,14 @@ class CLEMPreprocessingResult(BaseModel):
extent: list[float] # [x0, x1, y0, y1]


def _is_clem_atlas(result: CLEMPreprocessingResult):
# If an image has a width/height of at least 1.5 mm, it should qualify as an atlas
return (
max(result.pixels_x * result.pixel_size, result.pixels_y * result.pixel_size)
>= processing_params.atlas_threshold
)


def _register_clem_image_series(
session_id: int,
result: CLEMPreprocessingResult,
Expand Down Expand Up @@ -142,9 +150,7 @@ def _register_clem_image_series(

# Add metadata for this series
clem_img_series.search_string = str(output_file.parent / "*tiff")
clem_img_series.data_type = (
"atlas" if "Overview_" in result.series_name else "grid_square"
)
clem_img_series.data_type = "atlas" if _is_clem_atlas(result) else "grid_square"
clem_img_series.number_of_members = result.number_of_members
clem_img_series.pixels_x = result.pixels_x
clem_img_series.pixels_y = result.pixels_y
Expand Down Expand Up @@ -181,7 +187,7 @@ def _register_dcg_and_atlas(
dcg_name += f"--{result.series_name.split('--')[1]}"

# Determine values for atlas
if "Overview_" in result.series_name: # These are atlas datasets
if _is_clem_atlas(result):
output_file = list(result.output_files.values())[0]
atlas_name = str(output_file.parent / "*.tiff")
atlas_pixel_size = result.pixel_size
Expand All @@ -197,7 +203,7 @@ def _register_dcg_and_atlas(
dcg_entry = dcg_search[0]
# Update atlas if registering atlas dataset
# and data collection group already exists
if "Overview_" in result.series_name:
if _is_clem_atlas(result):
atlas_message = {
"session_id": session_id,
"dcgid": dcg_entry.id,
Expand Down
180 changes: 180 additions & 0 deletions tests/client/test_watchdir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import os
import queue
import threading
from pathlib import Path

import pytest

from murfey.client.watchdir import DirWatcher
from tests.conftest import ExampleVisit


def test_dirwatcher_initialises(tmp_path: Path):
# Check that the DirWatcher initialises with the default attributes
watcher = DirWatcher(path=str(tmp_path))
assert watcher._basepath == os.fspath(str(tmp_path))
assert watcher._lastscan == {}
assert watcher._file_candidates == {}
assert watcher._statusbar is None
assert watcher.settling_time == 60
assert watcher._appearance_time is None
assert watcher._substrings_blacklist == {}
assert watcher._transfer_all is True
assert watcher._modification_overwrite is None
assert isinstance(watcher._init_time, float)
assert isinstance(watcher.queue, queue.Queue)
assert isinstance(watcher.thread, threading.Thread)
assert watcher._stopping is False
assert watcher._halt_thread is False

# Check that the string representation is as expected
assert str(watcher) == f"<DirWatcher ({os.fspath(str(tmp_path))})>"


@pytest.fixture
def clem_visit_dir(tmp_path: Path):
visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}"
visit_dir = tmp_path / "clem" / "data" / "2025" / visit_name
visit_dir.mkdir(parents=True, exist_ok=True)
return visit_dir


@pytest.fixture
def clem_test_files(clem_visit_dir: Path):
# Create test files for the DirWatcher to scan
file_list: list[Path] = []
project_dir = clem_visit_dir / "images" / "test_grid"

# Example atlas collection
for s in range(20):
file_list.append(
project_dir
/ "Overview 1"
/ "Image 1"
/ f"Image 1--Stage{str(s).zfill(2)}.tif"
)
file_list.append(
project_dir / "Overview 1" / "Image 1" / "Metadata" / "Image 1.xlif"
)

# Example image stack collection
for c in range(3):
for z in range(10):
file_list.append(
project_dir
/ "TileScan 1"
/ "Position 1"
/ f"Position 1--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif"
)
file_list.append(
project_dir / "TileScan 1" / "Position 1" / "Metadata" / "Position 1.xlif"
)

# Create all files and directories specified
for file in file_list:
if not file.parent.exists():
file.parent.mkdir(parents=True)
if not file.exists():
file.touch()
return sorted(file_list)


@pytest.fixture
def clem_junk_files(clem_visit_dir: Path):
# Create junk files that are to be blacklisted from the CLEM workflow
file_list: list[Path] = []
project_dir = clem_visit_dir / "images" / "test_grid"

# Create junk atlas data
for n in range(5):
for s in range(20):
file_list.append(
project_dir
/ "Image 1"
/ f"Image 1_pmd_{n}"
/ f"Image 1_pmd_{n}--Stage{str(s).zfill(2)}.tif"
)
file_list.append(
project_dir / "Image 1" / f"Image 1_pmd_{n}" / "Metadata" / "Image 1.xlif"
)

# Creat junk image data
for n in range(5):
for c in range(3):
for z in range(10):
file_list.append(
project_dir
/ "Position 1"
/ f"Position 1_pmd_{n}"
/ f"Position 1_pmd_{n}--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif"
)
file_list.append(
project_dir
/ "Position 1"
/ f"Position 1_pmd_{n}"
/ "Metadata"
/ "Position 1.xlif"
)

# Create remaining junk files
for file_path in (
"1.xlef",
"Metadata/IOManagerConfiguation.xlif",
"Metadata/Overview 1.xlcf",
"Metadata/TileScan 1.xlcf",
"Overview 1/Image 1/Image 1_histo.lof",
"TileScan 1/Position 1/Position 1_histo.lof",
"Overview 1/Image 1/Metadata/Image 1_histo.xlif",
"TileScan 1/Position 1/Metadata/Position 1_histo.xlif",
):
file_list.append(project_dir / file_path)

# Create files and directoriees
for file in file_list:
if not file.parent.exists():
file.parent.mkdir(parents=True)
if not file.exists():
file.touch()
return sorted(file_list)


scan_directory_params_matrix: tuple[tuple[str, dict[str, list[str]]], ...] = (
# Workflow type | Substrings blacklist
(
"clem",
{
"directories": [
"_pmd_",
],
"files": [
".xlef",
".xlcf",
"_histo.lof",
"_histo.xlif",
"IOManagerConfiguation.xlif",
],
},
),
)


@pytest.mark.parametrize("test_params", scan_directory_params_matrix)
def test_scan_directory(
clem_visit_dir: Path,
clem_test_files: list[Path],
clem_junk_files: list[Path],
test_params: tuple[str, dict[str, list[str]]],
):
# Unpack test params
workflow_type, substrings_blacklist = test_params

# Initialise different watchers based on the workflow to test and run the scan
if workflow_type == "clem":
watcher = DirWatcher(
path=str(clem_visit_dir),
substrings_blacklist=substrings_blacklist,
)
result = watcher._scan_directory()

# Check that the result does not contain the junk files
assert [str(file) for file in clem_test_files] == sorted(result.keys())