diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 78b61614..7bd2d4b7 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -468,7 +468,13 @@ def rsync_result(update: RSyncerUpdate): session_id=self._environment.murfey_session, data=rsyncer_data, ) - self._environment.watchers[source] = DirWatcher(source, settling_time=30) + self._environment.watchers[source] = DirWatcher( + source, + settling_time=30, + substrings_blacklist=self._machine_config.get( + "substrings_blacklist", {"directories": [], "files": []} + ), + ) if not self.analysers.get(source) and analyse: log.info(f"Starting analyser for {source}") diff --git a/src/murfey/client/watchdir.py b/src/murfey/client/watchdir.py index 4fead797..07bbebcc 100644 --- a/src/murfey/client/watchdir.py +++ b/src/murfey/client/watchdir.py @@ -32,6 +32,7 @@ def __init__( path: str | os.PathLike, settling_time: float = 60, appearance_time: float | None = None, + substrings_blacklist: dict[str, dict] = {}, transfer_all: bool = True, status_bar: StatusBar | None = None, ): @@ -42,6 +43,7 @@ def __init__( self._statusbar = status_bar self.settling_time = settling_time self._appearance_time = appearance_time + self._substrings_blacklist = substrings_blacklist self._transfer_all = transfer_all self._modification_overwrite: float | None = None self._init_time: float = time.time() @@ -128,7 +130,7 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals settling_time=scan_completion ) - # Create a list of files sroted based on their timestamps + # Create a list of files sorted based on their timestamps files_for_transfer = [] time_ordered_file_candidates = sorted( self._file_candidates, @@ -150,8 +152,9 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals continue if ( - self._file_candidates[x].settling_time + self.settling_time # type: ignore - < time.time() + current_file_settling_time := self._file_candidates[x].settling_time + ) is not None and ( + current_file_settling_time + self.settling_time < time.time() ): try: file_stat = os.stat(x) @@ -252,7 +255,14 @@ def _scan_directory( raise for entry in directory_contents: entry_name = os.path.join(path, entry.name) - if entry.is_dir() and ( + # Skip any directories with matching blacklisted substrings + if entry.is_dir() and any( + char in entry.name + for char in self._substrings_blacklist.get("directories", []) + ): + log.debug(f"Skipping blacklisted directory {str(entry.name)!r}") + continue + elif entry.is_dir() and ( modification_time is None or entry.stat().st_ctime >= modification_time ): result.update(self._scan_directory(entry_name)) @@ -260,7 +270,13 @@ def _scan_directory( # Exclude textual log if "textual" in str(entry): continue - + # Exclude files with blacklisted substrings + if any( + char in entry.name + for char in self._substrings_blacklist.get("files", []) + ): + log.debug(f"Skipping blacklisted file {str(entry.name)!r}") + continue # Get file statistics and append file to dictionary try: file_stat = entry.stat() diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 86862de1..a4aa6220 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -48,6 +48,10 @@ class MachineConfig(BaseModel): # type: ignore analyse_created_directories: list[str] = [] gain_reference_directory: Optional[Path] = None eer_fractionation_file_template: str = "" + substrings_blacklist: dict[str, list] = { + "directories": [], + "files": [], + } # Data transfer setup ------------------------------------------------------------- # Rsync setup diff --git a/src/murfey/util/processing_params.py b/src/murfey/util/processing_params.py index cdb23991..a40ba987 100644 --- a/src/murfey/util/processing_params.py +++ b/src/murfey/util/processing_params.py @@ -54,14 +54,18 @@ def cryolo_model_path(visit: str, instrument_name: str) -> Path: return machine_config.default_model -class CLEMAlignAndMergeParameters(BaseModel): +class CLEMProcessingParameters(BaseModel): + # Atlas vs GridSquare registration threshold + atlas_threshold: float = 0.0015 # in m + + # Image alignment and merging-specific parameters crop_to_n_frames: Optional[int] = 50 align_self: Literal["enabled", ""] = "enabled" flatten: Literal["mean", "min", "max", ""] = "mean" align_across: Literal["enabled", ""] = "enabled" -default_clem_align_and_merge_parameters = CLEMAlignAndMergeParameters() +default_clem_processing_parameters = CLEMProcessingParameters() class SPAParameters(BaseModel): diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index ccbc99b7..2cb19e0e 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -22,7 +22,7 @@ from murfey.server import _transport_object from murfey.util.models import GridSquareParameters from murfey.util.processing_params import ( - default_clem_align_and_merge_parameters as processing_params, + default_clem_processing_parameters as processing_params, ) from murfey.workflows.clem import get_db_entry from murfey.workflows.clem.align_and_merge import submit_cluster_request @@ -51,6 +51,14 @@ class CLEMPreprocessingResult(BaseModel): extent: list[float] # [x0, x1, y0, y1] +def _is_clem_atlas(result: CLEMPreprocessingResult): + # If an image has a width/height of at least 1.5 mm, it should qualify as an atlas + return ( + max(result.pixels_x * result.pixel_size, result.pixels_y * result.pixel_size) + >= processing_params.atlas_threshold + ) + + def _register_clem_image_series( session_id: int, result: CLEMPreprocessingResult, @@ -142,9 +150,7 @@ def _register_clem_image_series( # Add metadata for this series clem_img_series.search_string = str(output_file.parent / "*tiff") - clem_img_series.data_type = ( - "atlas" if "Overview_" in result.series_name else "grid_square" - ) + clem_img_series.data_type = "atlas" if _is_clem_atlas(result) else "grid_square" clem_img_series.number_of_members = result.number_of_members clem_img_series.pixels_x = result.pixels_x clem_img_series.pixels_y = result.pixels_y @@ -181,7 +187,7 @@ def _register_dcg_and_atlas( dcg_name += f"--{result.series_name.split('--')[1]}" # Determine values for atlas - if "Overview_" in result.series_name: # These are atlas datasets + if _is_clem_atlas(result): output_file = list(result.output_files.values())[0] atlas_name = str(output_file.parent / "*.tiff") atlas_pixel_size = result.pixel_size @@ -197,7 +203,7 @@ def _register_dcg_and_atlas( dcg_entry = dcg_search[0] # Update atlas if registering atlas dataset # and data collection group already exists - if "Overview_" in result.series_name: + if _is_clem_atlas(result): atlas_message = { "session_id": session_id, "dcgid": dcg_entry.id, diff --git a/tests/client/test_watchdir.py b/tests/client/test_watchdir.py new file mode 100644 index 00000000..c1ada825 --- /dev/null +++ b/tests/client/test_watchdir.py @@ -0,0 +1,180 @@ +import os +import queue +import threading +from pathlib import Path + +import pytest + +from murfey.client.watchdir import DirWatcher +from tests.conftest import ExampleVisit + + +def test_dirwatcher_initialises(tmp_path: Path): + # Check that the DirWatcher initialises with the default attributes + watcher = DirWatcher(path=str(tmp_path)) + assert watcher._basepath == os.fspath(str(tmp_path)) + assert watcher._lastscan == {} + assert watcher._file_candidates == {} + assert watcher._statusbar is None + assert watcher.settling_time == 60 + assert watcher._appearance_time is None + assert watcher._substrings_blacklist == {} + assert watcher._transfer_all is True + assert watcher._modification_overwrite is None + assert isinstance(watcher._init_time, float) + assert isinstance(watcher.queue, queue.Queue) + assert isinstance(watcher.thread, threading.Thread) + assert watcher._stopping is False + assert watcher._halt_thread is False + + # Check that the string representation is as expected + assert str(watcher) == f"" + + +@pytest.fixture +def clem_visit_dir(tmp_path: Path): + visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}" + visit_dir = tmp_path / "clem" / "data" / "2025" / visit_name + visit_dir.mkdir(parents=True, exist_ok=True) + return visit_dir + + +@pytest.fixture +def clem_test_files(clem_visit_dir: Path): + # Create test files for the DirWatcher to scan + file_list: list[Path] = [] + project_dir = clem_visit_dir / "images" / "test_grid" + + # Example atlas collection + for s in range(20): + file_list.append( + project_dir + / "Overview 1" + / "Image 1" + / f"Image 1--Stage{str(s).zfill(2)}.tif" + ) + file_list.append( + project_dir / "Overview 1" / "Image 1" / "Metadata" / "Image 1.xlif" + ) + + # Example image stack collection + for c in range(3): + for z in range(10): + file_list.append( + project_dir + / "TileScan 1" + / "Position 1" + / f"Position 1--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif" + ) + file_list.append( + project_dir / "TileScan 1" / "Position 1" / "Metadata" / "Position 1.xlif" + ) + + # Create all files and directories specified + for file in file_list: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + if not file.exists(): + file.touch() + return sorted(file_list) + + +@pytest.fixture +def clem_junk_files(clem_visit_dir: Path): + # Create junk files that are to be blacklisted from the CLEM workflow + file_list: list[Path] = [] + project_dir = clem_visit_dir / "images" / "test_grid" + + # Create junk atlas data + for n in range(5): + for s in range(20): + file_list.append( + project_dir + / "Image 1" + / f"Image 1_pmd_{n}" + / f"Image 1_pmd_{n}--Stage{str(s).zfill(2)}.tif" + ) + file_list.append( + project_dir / "Image 1" / f"Image 1_pmd_{n}" / "Metadata" / "Image 1.xlif" + ) + + # Creat junk image data + for n in range(5): + for c in range(3): + for z in range(10): + file_list.append( + project_dir + / "Position 1" + / f"Position 1_pmd_{n}" + / f"Position 1_pmd_{n}--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif" + ) + file_list.append( + project_dir + / "Position 1" + / f"Position 1_pmd_{n}" + / "Metadata" + / "Position 1.xlif" + ) + + # Create remaining junk files + for file_path in ( + "1.xlef", + "Metadata/IOManagerConfiguation.xlif", + "Metadata/Overview 1.xlcf", + "Metadata/TileScan 1.xlcf", + "Overview 1/Image 1/Image 1_histo.lof", + "TileScan 1/Position 1/Position 1_histo.lof", + "Overview 1/Image 1/Metadata/Image 1_histo.xlif", + "TileScan 1/Position 1/Metadata/Position 1_histo.xlif", + ): + file_list.append(project_dir / file_path) + + # Create files and directoriees + for file in file_list: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + if not file.exists(): + file.touch() + return sorted(file_list) + + +scan_directory_params_matrix: tuple[tuple[str, dict[str, list[str]]], ...] = ( + # Workflow type | Substrings blacklist + ( + "clem", + { + "directories": [ + "_pmd_", + ], + "files": [ + ".xlef", + ".xlcf", + "_histo.lof", + "_histo.xlif", + "IOManagerConfiguation.xlif", + ], + }, + ), +) + + +@pytest.mark.parametrize("test_params", scan_directory_params_matrix) +def test_scan_directory( + clem_visit_dir: Path, + clem_test_files: list[Path], + clem_junk_files: list[Path], + test_params: tuple[str, dict[str, list[str]]], +): + # Unpack test params + workflow_type, substrings_blacklist = test_params + + # Initialise different watchers based on the workflow to test and run the scan + if workflow_type == "clem": + watcher = DirWatcher( + path=str(clem_visit_dir), + substrings_blacklist=substrings_blacklist, + ) + result = watcher._scan_directory() + + # Check that the result does not contain the junk files + assert [str(file) for file in clem_test_files] == sorted(result.keys())