diff --git a/src/murfey/server/ispyb.py b/src/murfey/server/ispyb.py index f2c0cd801..0085f81bc 100644 --- a/src/murfey/server/ispyb.py +++ b/src/murfey/server/ispyb.py @@ -153,7 +153,11 @@ def do_insert_atlas(self, record: Atlas): return {"success": False, "return_value": None} def do_update_atlas( - self, atlas_id: int, atlas_image: str, pixel_size: float, slot: int + self, + atlas_id: int, + atlas_image: str, + pixel_size: float, + slot: int | None, ): try: with ISPyBSession() as db: @@ -190,34 +194,14 @@ def do_insert_grid_square( grid_square_parameters.readout_area_x / grid_square_parameters.thumbnail_size_x ) - grid_square_parameters.height = ( - int(grid_square_parameters.height / 7.8) - if grid_square_parameters.height - else None - ) - grid_square_parameters.width = ( - int(grid_square_parameters.width / 7.8) - if grid_square_parameters.width - else None - ) - grid_square_parameters.x_location = ( - int(grid_square_parameters.x_location / 7.8) - if grid_square_parameters.x_location - else None - ) - grid_square_parameters.y_location = ( - int(grid_square_parameters.y_location / 7.8) - if grid_square_parameters.y_location - else None - ) record = GridSquare( atlasId=atlas_id, gridSquareLabel=grid_square_id, gridSquareImage=grid_square_parameters.image, - pixelLocationX=grid_square_parameters.x_location, - pixelLocationY=grid_square_parameters.y_location, - height=grid_square_parameters.height, - width=grid_square_parameters.width, + pixelLocationX=grid_square_parameters.x_location_scaled, + pixelLocationY=grid_square_parameters.y_location_scaled, + height=grid_square_parameters.height_scaled, + width=grid_square_parameters.width_scaled, angle=grid_square_parameters.angle, stageLocationX=grid_square_parameters.x_stage_position, stageLocationY=grid_square_parameters.y_stage_position, @@ -242,7 +226,7 @@ def do_update_grid_square( ): try: with ISPyBSession() as db: - grid_square = ( + grid_square: GridSquare = ( db.query(GridSquare) .filter(GridSquare.gridSquareId == grid_square_id) .one() @@ -258,18 +242,18 @@ def do_update_grid_square( ) if grid_square_parameters.image: grid_square.gridSquareImage = grid_square_parameters.image - if grid_square_parameters.x_location: - grid_square.pixelLocationX = int( - grid_square_parameters.x_location / 7.8 + if grid_square_parameters.x_location_scaled: + grid_square.pixelLocationX = ( + grid_square_parameters.x_location_scaled ) - if grid_square_parameters.y_location: - grid_square.pixelLocationY = int( - grid_square_parameters.y_location / 7.8 + if grid_square_parameters.y_location_scaled: + grid_square.pixelLocationY = ( + grid_square_parameters.y_location_scaled ) - if grid_square_parameters.height is not None: - grid_square.height = int(grid_square_parameters.height / 7.8) - if grid_square_parameters.width is not None: - grid_square.width = int(grid_square_parameters.width / 7.8) + if grid_square_parameters.height_scaled is not None: + grid_square.height = grid_square_parameters.height_scaled + if grid_square_parameters.width_scaled is not None: + grid_square.width = grid_square_parameters.width_scaled if grid_square_parameters.angle: grid_square.angle = grid_square_parameters.angle if grid_square_parameters.x_stage_position: diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index cee30b0ea..801ac5e46 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -230,7 +230,7 @@ class CLEMImageMetadata(SQLModel, table=True): # type: ignore class CLEMImageSeries(SQLModel, table=True): # type: ignore """ - Database recording the individual files associated with a series, which are to be + Database recording the files and metadata associated with a series, which are to be processed together as a group. These files could stem from a parent LIF file, or have been compiled together from individual TIFF files. """ @@ -239,6 +239,7 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore series_name: str = Field( index=True ) # Name of the series, as determined from the metadata + search_string: Optional[str] = Field(default=None) # Path for globbing with session: Optional["Session"] = Relationship( back_populates="image_series" @@ -247,6 +248,22 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore foreign_key="session.id", default=None, unique=False ) + # Type of data (atlas/overview or grid square) + data_type: Optional[str] = Field(default=None) # "atlas" or "grid_square" + + # Link to data collection group + data_collection_group: Optional["DataCollectionGroup"] = Relationship( + back_populates="clem_image_series" + ) + dcg_id: Optional[int] = Field(foreign_key="datacollectiongroup.id", default=None) + dcg_name: Optional[str] = Field(default=None) + + # Link to grid squares + grid_square: Optional["GridSquare"] = Relationship( + back_populates="clem_image_series" + ) + grid_square_id: Optional[int] = Field(foreign_key="gridsquare.id", default=None) + # The parent LIF file this series originates from, if any parent_lif: Optional["CLEMLIFFile"] = Relationship( back_populates="child_series", @@ -270,18 +287,27 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore default=None, ) - # Databases of the image stacks that comprise this series + # Image stack entries that are part of this series child_stacks: List["CLEMImageStack"] = Relationship( back_populates="parent_series", sa_relationship_kwargs={"cascade": "delete"}, ) # One to many + number_of_members: Optional[int] = Field(default=None) - # Process checklist for series - number_of_members: int = ( - 0 # Expected number of image stacks belonging to this series - ) + # Shape and resolution information + pixels_x: Optional[int] = Field(default=None) + pixels_y: Optional[int] = Field(default=None) + pixel_size: Optional[float] = Field(default=None) + units: Optional[str] = Field(default=None) + + # Extent of the imaged area in real space + x0: Optional[float] = Field(default=None) + x1: Optional[float] = Field(default=None) + y0: Optional[float] = Field(default=None) + y1: Optional[float] = Field(default=None) + + # Composite images composite_created: bool = False # Has a composite image been created? - composite_image: Optional[str] = None # Full path to composite image class CLEMImageStack(SQLModel, table=True): # type: ignore @@ -389,6 +415,10 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore back_populates="data_collection_group", sa_relationship_kwargs={"cascade": "delete"}, ) + clem_image_series: List["CLEMImageSeries"] = Relationship( + back_populates="data_collection_group", + sa_relationship_kwargs={"cascade": "delete"}, + ) notification_parameters: List["NotificationParameter"] = Relationship( back_populates="data_collection_group", sa_relationship_kwargs={"cascade": "delete"}, @@ -591,6 +621,9 @@ class GridSquare(SQLModel, table=True): # type: ignore pixel_size: Optional[float] = None image: str = "" session: Optional[Session] = Relationship(back_populates="grid_squares") + clem_image_series: List["CLEMImageSeries"] = Relationship( + back_populates="grid_square", sa_relationship_kwargs={"cascade": "delete"} + ) foil_holes: List["FoilHole"] = Relationship( back_populates="grid_square", sa_relationship_kwargs={"cascade": "delete"} ) diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 64f9cd4f4..c1ae42a56 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -128,7 +128,9 @@ class Base(BaseModel): class GridSquareParameters(BaseModel): tag: str x_location: Optional[float] = None + x_location_scaled: Optional[int] = None y_location: Optional[float] = None + y_location_scaled: Optional[int] = None x_stage_position: Optional[float] = None y_stage_position: Optional[float] = None readout_area_x: Optional[int] = None @@ -136,7 +138,9 @@ class GridSquareParameters(BaseModel): thumbnail_size_x: Optional[int] = None thumbnail_size_y: Optional[int] = None height: Optional[int] = None + height_scaled: Optional[int] = None width: Optional[int] = None + width_scaled: Optional[int] = None pixel_size: Optional[float] = None image: str = "" angle: Optional[float] = None diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py index fe52058b6..fb5f563aa 100644 --- a/src/murfey/workflows/clem/register_align_and_merge_results.py +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -87,7 +87,6 @@ def register_align_and_merge_result( session_id=session_id, series_name=result.series_name, ) - clem_img_series.composite_image = str(result.composite_image) clem_img_series.composite_created = True murfey_db.add(clem_img_series) murfey_db.commit() diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 148223984..e79da1d0c 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -11,21 +11,16 @@ import logging import re import traceback +from importlib.metadata import entry_points from pathlib import Path from typing import Literal, Optional from pydantic import BaseModel from sqlmodel import Session, select +import murfey.util.db as MurfeyDB from murfey.server import _transport_object -from murfey.util.db import ( - CLEMImageMetadata, - CLEMImageSeries, - CLEMImageStack, - CLEMLIFFile, - CLEMTIFFFile, - Session as MurfeySession, -) +from murfey.util.models import GridSquareParameters from murfey.util.processing_params import ( default_clem_align_and_merge_parameters as processing_params, ) @@ -53,7 +48,388 @@ class CLEMPreprocessingResult(BaseModel): units: str pixel_size: float resolution: float - extent: list[float] + extent: list[float] # [x0, x1, y0, y1] + + +def _register_clem_image_series( + session_id: int, + result: CLEMPreprocessingResult, + murfey_db: Session, +): + clem_img_series: MurfeyDB.CLEMImageSeries = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_metadata: MurfeyDB.CLEMImageMetadata = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) + # Register and link parent LIF file if present + if result.parent_lif is not None: + clem_lif_file: MurfeyDB.CLEMLIFFile = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMLIFFile, + session_id=session_id, + file_path=result.parent_lif, + ) + clem_img_series.parent_lif = clem_lif_file + clem_metadata.parent_lif = clem_lif_file + + # Link and commit series and metadata tables + clem_img_series.associated_metadata = clem_metadata + murfey_db.add_all([clem_img_series, clem_metadata]) + murfey_db.commit() + + # Iteratively register the output image stacks + for c, (channel, output_file) in enumerate(result.output_files.items()): + clem_img_stk: MurfeyDB.CLEMImageStack = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageStack, + session_id=session_id, + file_path=output_file, + ) + + # Link associated metadata + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = channel + if result.parent_lif is not None: + clem_img_stk.parent_lif = clem_lif_file + murfey_db.add(clem_img_stk) + murfey_db.commit() + + # Register and link parent TIFF files if present + if result.parent_tiffs: + seed_file = result.parent_tiffs[channel][0] + if c == 0: + # Load list of files to register from seed file + series_identifier = seed_file.stem.split("--")[0] + "--" + tiff_list = list(seed_file.parent.glob(f"{series_identifier}--")) + + # Load TIFF files by colour channel if "--C" in file stem + match = re.search(r"--C[\d]{2,3}", seed_file.stem) + tiff_file_subset = [ + file + for file in tiff_list + if file.stem.startswith(series_identifier) + and (match.group(0) in file.stem if match else True) + ] + tiff_file_subset.sort() + + # Register TIFF file subset + clem_tiff_files = [] + for file in tiff_file_subset: + clem_tiff_file: MurfeyDB.CLEMTIFFFile = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + + # Link associated metadata + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + + clem_tiff_files.append(clem_tiff_file) + + murfey_db.add_all(clem_tiff_files) + murfey_db.commit() + + # Add metadata for this series + clem_img_series.search_string = str(output_file.parent / "*tiff") + clem_img_series.data_type = ( + "atlas" if "Overview_" in result.series_name else "grid_square" + ) + clem_img_series.number_of_members = result.number_of_members + clem_img_series.pixels_x = result.pixels_x + clem_img_series.pixels_y = result.pixels_y + clem_img_series.pixel_size = result.pixel_size + clem_img_series.units = result.units + clem_img_series.x0 = result.extent[0] + clem_img_series.x1 = result.extent[1] + clem_img_series.y0 = result.extent[2] + clem_img_series.y1 = result.extent[3] + murfey_db.add(clem_img_series) + murfey_db.commit() + murfey_db.close() + + logger.info(f"CLEM preprocessing results registered for {result.series_name!r} ") + + +def _register_dcg_and_atlas( + session_id: int, + instrument_name: str, + visit_name: str, + result: CLEMPreprocessingResult, + murfey_db: Session, +): + # Determine variables to register data collection group and atlas with + proposal_code = "".join(char for char in visit_name.split("-")[0] if char.isalpha()) + proposal_number = "".join( + char for char in visit_name.split("-")[0] if char.isdigit() + ) + visit_number = visit_name.split("-")[-1] + + # Generate name/tag for data colleciton group based on series name + dcg_name = result.series_name.split("--")[0] + if result.series_name.split("--")[1].isdigit(): + dcg_name += f"--{result.series_name.split('--')[1]}" + + # Determine values for atlas + if "Overview_" in result.series_name: # These are atlas datasets + output_file = list(result.output_files.values())[0] + atlas_name = str(output_file.parent / "*.tiff") + atlas_pixel_size = result.pixel_size + else: + atlas_name = "" + atlas_pixel_size = 0.0 + + if dcg_search := murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) + ).all(): + dcg_entry = dcg_search[0] + # Update atlas if registering atlas dataset + # and data collection group already exists + if "Overview_" in result.series_name: + atlas_message = { + "session_id": session_id, + "dcgid": dcg_entry.id, + "atlas_id": dcg_entry.atlas_id, + "atlas": atlas_name, + "atlas_pixel_size": atlas_pixel_size, + "sample": dcg_entry.sample, + } + if entry_point_result := entry_points( + group="murfey.workflows", name="atlas_update" + ): + (workflow,) = entry_point_result + _ = workflow.load()( + message=atlas_message, + murfey_db=murfey_db, + ) + else: + logger.warning("No workflow found for 'atlas_update'") + else: + # Register data collection group and placeholder for the atlas + dcg_message = { + "microscope": instrument_name, + "proposal_code": proposal_code, + "proposal_number": proposal_number, + "visit_number": visit_number, + "session_id": session_id, + "tag": dcg_name, + "experiment_type": "experiment", + "experiment_type_id": None, + "atlas": atlas_name, + "atlas_pixel_size": atlas_pixel_size, + "sample": None, + } + if entry_point_result := entry_points( + group="murfey.workflows", name="data_collection_group" + ): + (workflow,) = entry_point_result + # Register grid square + _ = workflow.load()( + message=dcg_message, + murfey_db=murfey_db, + ) + else: + logger.warning("No workflow found for 'data_collection_group'") + + # Store data collection group id in CLEM image series table + dcg_entry = murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) + ).one() + + clem_img_series: MurfeyDB.CLEMImageSeries = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_img_series.dcg_id = dcg_entry.id + clem_img_series.dcg_name = dcg_entry.tag + murfey_db.add(clem_img_series) + murfey_db.commit() + murfey_db.close() + + +def _register_grid_square( + session_id: int, + result: CLEMPreprocessingResult, + murfey_db: Session, +): + # Skip this step if no transport manager object is configured + if _transport_object is None: + logger.error("Unable to find transport manager") + return + # Load all entries for the current data collection group + dcg_name = result.series_name.split("--")[0] + if result.series_name.split("--")[1].isdigit(): + dcg_name += f"--{result.series_name.split('--')[1]}" + + # Check if an atlas has been registered + if atlas_search := murfey_db.exec( + select(MurfeyDB.CLEMImageSeries) + .where(MurfeyDB.CLEMImageSeries.session_id == session_id) + .where(MurfeyDB.CLEMImageSeries.dcg_name == dcg_name) + .where(MurfeyDB.CLEMImageSeries.data_type == "atlas") + ).all(): + atlas_entry = atlas_search[0] + else: + logger.info( + f"No atlas has been registered for data collection group {dcg_name!r} yet" + ) + return + + # Check if there are CLEM entries to register + if clem_img_series_to_register := murfey_db.exec( + select(MurfeyDB.CLEMImageSeries) + .where(MurfeyDB.CLEMImageSeries.session_id == session_id) + .where(MurfeyDB.CLEMImageSeries.dcg_name == dcg_name) + .where(MurfeyDB.CLEMImageSeries.data_type == "grid_square") + ): + if ( + atlas_entry.x0 is not None + and atlas_entry.x1 is not None + and atlas_entry.y0 is not None + and atlas_entry.y1 is not None + and atlas_entry.pixels_x is not None + and atlas_entry.pixels_y is not None + ): + atlas_width_real = atlas_entry.x1 - atlas_entry.x0 + atlas_height_real = atlas_entry.y1 - atlas_entry.y0 + else: + logger.warning("Atlas entry not populated with required values") + return + + for clem_img_series in clem_img_series_to_register: + if ( + clem_img_series.x0 is not None + and clem_img_series.x1 is not None + and clem_img_series.y0 is not None + and clem_img_series.y1 is not None + ): + # Find pixel corresponding to image midpoint on atlas + x_mid_real = ( + 0.5 * (clem_img_series.x0 + clem_img_series.x1) - atlas_entry.x0 + ) + x_mid_px = int(x_mid_real / atlas_width_real * atlas_entry.pixels_x) + y_mid_real = ( + 0.5 * (clem_img_series.y0 + clem_img_series.y1) - atlas_entry.y0 + ) + y_mid_px = int(y_mid_real / atlas_height_real * atlas_entry.pixels_y) + + # Find the number of pixels in width and height the image corresponds to on the atlas + width_scaled = int( + (clem_img_series.x1 - clem_img_series.x0) + / atlas_width_real + * atlas_entry.pixels_x + ) + height_scaled = int( + (clem_img_series.y1 - clem_img_series.y0) + / atlas_height_real + * atlas_entry.pixels_y + ) + else: + logger.warning( + f"Image series {clem_img_series.series_name!r} not populated with required values" + ) + continue + + # Populate grid square Pydantic model + grid_square_params = GridSquareParameters( + tag=dcg_name, + x_location=clem_img_series.x0, + x_location_scaled=x_mid_px, + y_location=clem_img_series.y0, + y_location_scaled=y_mid_px, + height=clem_img_series.pixels_x, + height_scaled=height_scaled, + width=clem_img_series.pixels_y, + width_scaled=width_scaled, + x_stage_position=clem_img_series.x0, + y_stage_position=clem_img_series.y0, + pixel_size=clem_img_series.pixel_size, + image=clem_img_series.search_string, + ) + # Register or update the grid square entry as required + if grid_square_result := murfey_db.exec( + select(MurfeyDB.GridSquare) + .where(MurfeyDB.GridSquare.name == clem_img_series.id) + .where(MurfeyDB.GridSquare.tag == grid_square_params.tag) + .where(MurfeyDB.GridSquare.session_id == session_id) + ).all(): + # Update existing grid square entry on Murfey + grid_square_entry = grid_square_result[0] + grid_square_entry.x_location = grid_square_params.x_location + grid_square_entry.y_location = grid_square_params.y_location + grid_square_entry.x_stage_position = grid_square_params.x_stage_position + grid_square_entry.y_stage_position = grid_square_params.y_stage_position + grid_square_entry.readout_area_x = grid_square_params.readout_area_x + grid_square_entry.readout_area_y = grid_square_params.readout_area_y + grid_square_entry.thumbnail_size_x = grid_square_params.thumbnail_size_x + grid_square_entry.thumbnail_size_y = grid_square_params.thumbnail_size_y + grid_square_entry.pixel_size = grid_square_params.pixel_size + grid_square_entry.image = grid_square_params.image + + # Update existing entry on ISPyB + _transport_object.do_update_grid_square( + grid_square_id=grid_square_entry.id, + grid_square_parameters=grid_square_params, + ) + else: + # Look up data collection group for current series + dcg_entry = murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == grid_square_params.tag) + ).one() + # Register to ISPyB + grid_square_ispyb_result = _transport_object.do_insert_grid_square( + atlas_id=dcg_entry.atlas_id, + grid_square_id=clem_img_series.id, + grid_square_parameters=grid_square_params, + ) + # Register to Murfey + grid_square_entry = MurfeyDB.GridSquare( + id=grid_square_ispyb_result.get("return_value", None), + name=clem_img_series.id, + session_id=session_id, + tag=grid_square_params.tag, + x_location=grid_square_params.x_location, + y_location=grid_square_params.y_location, + x_stage_position=grid_square_params.x_stage_position, + y_stage_position=grid_square_params.y_stage_position, + readout_area_x=grid_square_params.readout_area_x, + readout_area_y=grid_square_params.readout_area_y, + thumbnail_size_x=grid_square_params.thumbnail_size_x, + thumbnail_size_y=grid_square_params.thumbnail_size_y, + pixel_size=grid_square_params.pixel_size, + image=grid_square_params.image, + ) + murfey_db.add(grid_square_entry) + murfey_db.commit() + + # Add grid square ID to existing CLEM image series entry + clem_img_series.grid_square_id = grid_square_entry.id + murfey_db.add(clem_img_series) + murfey_db.commit() + else: + logger.info( + f"No grid squares to register for data collection group {dcg_name!r} yet" + ) + murfey_db.close() + return def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool]: @@ -82,99 +458,24 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool # Outer try-finally block for tidying up database-related section of function try: - # Register items in database if not already present try: - clem_img_series: CLEMImageSeries = get_db_entry( - db=murfey_db, - table=CLEMImageSeries, - session_id=session_id, - series_name=result.series_name, + # Load current session from database + murfey_session = murfey_db.exec( + select(MurfeyDB.Session).where(MurfeyDB.Session.id == session_id) + ).one() + except Exception: + logger.error( + "Exception encountered when loading Murfey session information: \n", + f"{traceback.format_exc()}", ) - clem_metadata: CLEMImageMetadata = get_db_entry( - db=murfey_db, - table=CLEMImageMetadata, + return {"success": False, "requeue": False} + try: + # Register items in Murfey database + _register_clem_image_series( session_id=session_id, - file_path=result.metadata, - ) - # Register and link parent LIF file if present - if result.parent_lif is not None: - clem_lif_file: CLEMLIFFile = get_db_entry( - db=murfey_db, - table=CLEMLIFFile, - session_id=session_id, - file_path=result.parent_lif, - ) - clem_img_series.parent_lif = clem_lif_file - clem_metadata.parent_lif = clem_lif_file - - # Link and commit series and metadata tables first - clem_img_series.associated_metadata = clem_metadata - clem_img_series.number_of_members = result.number_of_members - murfey_db.add_all([clem_img_series, clem_metadata]) - murfey_db.commit() - - # Iteratively register the output image stacks - for c, (channel, output_file) in enumerate(result.output_files.items()): - clem_img_stk: CLEMImageStack = get_db_entry( - db=murfey_db, - table=CLEMImageStack, - session_id=session_id, - file_path=output_file, - ) - - # Link associated metadata - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = channel - if result.parent_lif is not None: - clem_img_stk.parent_lif = clem_lif_file - murfey_db.add(clem_img_stk) - murfey_db.commit() - - # Register and link parent TIFF files if present - if result.parent_tiffs: - seed_file = result.parent_tiffs[channel][0] - if c == 0: - # Load list of files to register from seed file - series_identifier = seed_file.stem.split("--")[0] + "--" - tiff_list = list( - seed_file.parent.glob(f"{series_identifier}--") - ) - - # Load TIFF files by colour channel if "--C" in file stem - match = re.search(r"--C[\d]{2,3}", seed_file.stem) - tiff_file_subset = [ - file - for file in tiff_list - if file.stem.startswith(series_identifier) - and (match.group(0) in file.stem if match else True) - ] - tiff_file_subset.sort() - - # Register TIFF file subset - clem_tiff_files = [] - for file in tiff_file_subset: - clem_tiff_file: CLEMTIFFFile = get_db_entry( - db=murfey_db, - table=CLEMTIFFFile, - session_id=session_id, - file_path=file, - ) - - # Link associated metadata - clem_tiff_file.associated_metadata = clem_metadata - clem_tiff_file.child_series = clem_img_series - clem_tiff_file.child_stack = clem_img_stk - - clem_tiff_files.append(clem_tiff_file) - - murfey_db.add_all(clem_tiff_files) - murfey_db.commit() - - logger.info( - f"CLEM preprocessing results registered for {result.series_name!r} " + result=result, + murfey_db=murfey_db, ) - except Exception: logger.error( "Exception encountered when registering CLEM preprocessing result for " @@ -182,29 +483,43 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool f"{traceback.format_exc()}" ) return {"success": False, "requeue": False} + try: + # Register data collection group and atlas in ISPyB + _register_dcg_and_atlas( + session_id=session_id, + instrument_name=murfey_session.instrument_name, + visit_name=murfey_session.visit, + result=result, + murfey_db=murfey_db, + ) + except Exception: + # Log error but allow workflow to proceed + logger.error( + "Exception encountered when registering data collection group for CLEM workflow " + f"using {result.series_name!r}: \n" + f"{traceback.format_exc()}" + ) - # Load instrument name try: - instrument_name = ( - murfey_db.exec( - select(MurfeySession).where(MurfeySession.id == session_id) - ) - .one() - .instrument_name + # Register CLEM image series as grid squares + _register_grid_square( + session_id=session_id, + result=result, + murfey_db=murfey_db, ) except Exception: + # Log error but allow workflow to proceed logger.error( - f"Error requesting data from database for {result.series_name!r} series: \n" + f"Exception encountered when registering grid square for {result.series_name}: \n" f"{traceback.format_exc()}" ) - return {"success": False, "requeue": False} # Construct list of files to use for image alignment and merging steps image_combos_to_process = [ list(result.output_files.values()) # Composite image of all channels ] - # Create additional job for fluorescent-only composite image if fluorescent channels are present if ("gray" in result.output_files.keys()) and len(result.output_files) > 1: + # Create additional fluorescent-only composite image image_combos_to_process.append( [ file @@ -212,13 +527,21 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool if channel != "gray" ] ) + # Create additional bright field-only image + image_combos_to_process.append( + [ + file + for channel, file in result.output_files.items() + if channel == "gray" + ] + ) # Request for image alignment and processing for the requested combinations for image_combo in image_combos_to_process: try: submit_cluster_request( session_id=session_id, - instrument_name=instrument_name, + instrument_name=murfey_session.instrument_name, series_name=result.series_name, images=image_combo, metadata=result.metadata, diff --git a/src/murfey/workflows/spa/flush_spa_preprocess.py b/src/murfey/workflows/spa/flush_spa_preprocess.py index ffbb6eb31..9f067f683 100644 --- a/src/murfey/workflows/spa/flush_spa_preprocess.py +++ b/src/murfey/workflows/spa/flush_spa_preprocess.py @@ -43,6 +43,16 @@ def register_grid_square( grid_square_params: GridSquareParameters, murfey_db: Session, ): + # Calculate scaled down version of the image for registration to ISPyB first + if grid_square_params.x_location is not None: + grid_square_params.x_location_scaled = int(grid_square_params.x_location / 7.8) + if grid_square_params.y_location is not None: + grid_square_params.y_location_scaled = int(grid_square_params.y_location / 7.8) + if grid_square_params.height is not None: + grid_square_params.height_scaled = int(grid_square_params.height / 7.8) + if grid_square_params.width is not None: + grid_square_params.width_scaled = int(grid_square_params.width / 7.8) + try: grid_square = murfey_db.exec( select(GridSquare) diff --git a/tests/workflows/clem/test_register_preprocessing_results.py b/tests/workflows/clem/test_register_preprocessing_results.py new file mode 100644 index 000000000..06f7a5a09 --- /dev/null +++ b/tests/workflows/clem/test_register_preprocessing_results.py @@ -0,0 +1,319 @@ +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock + +import ispyb.sqlalchemy as ISPyBDB +import pytest +from pytest_mock import MockerFixture +from sqlalchemy import select as sa_select +from sqlalchemy.orm.session import Session as SQLAlchemySession +from sqlmodel import select as sm_select +from sqlmodel.orm.session import Session as SQLModelSession + +import murfey.util.db as MurfeyDB +from murfey.workflows.clem.register_preprocessing_results import ( + _register_clem_image_series, + _register_dcg_and_atlas, + _register_grid_square, + run, +) +from tests.conftest import ExampleVisit, get_or_create_db_entry + +visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}" +processed_dir_name = "processed" +grid_name = "Grid_1" +colors = ("gray", "green", "red") + + +@pytest.fixture +def rsync_basepath(tmp_path: Path): + return tmp_path / "data" + + +def generate_preprocessing_messages( + rsync_basepath: Path, + session_id: int, +): + # Make directory to where data for current grid is stored + visit_dir = rsync_basepath / "2020" / visit_name + processed_dir = visit_dir / processed_dir_name + grid_dir = processed_dir / grid_name + grid_dir.mkdir(parents=True, exist_ok=True) + + # Construct all the datasets to be tested + datasets: list[tuple[Path, bool, bool, tuple[int, int], float, list[float]]] = [ + ( + grid_dir / "Overview_1" / "Image_1", + False, + True, + (2400, 2400), + 1e-6, + [0.002, 0.0044, 0.002, 0.0044], + ) + ] + # Add on metadata for a few grid squares + datasets.extend( + [ + ( + grid_dir / "TileScan_1" / f"Position_{n}", + True, + False, + (2048, 2048), + 1.6e-7, + [0.003, 0.00332768, 0.003, 0.00332768], + ) + for n in range(5) + ] + ) + + messages: list[dict[str, Any]] = [] + for dataset in datasets: + # Unpack items from list of dataset parameters + series_path = dataset[0] + series_name = str(series_path.relative_to(processed_dir)).replace("/", "--") + metadata = series_path / "metadata" / f"{series_path.stem}.xml" + metadata.parent.mkdir(parents=True, exist_ok=True) + metadata.touch(exist_ok=True) + output_files = {color: str(series_path / f"{color}.tiff") for color in colors} + for output_file in output_files.values(): + Path(output_file).touch(exist_ok=True) + is_stack = dataset[1] + is_montage = dataset[2] + shape = dataset[3] + pixel_size = dataset[4] + extent = dataset[5] + + message = { + "session_id": session_id, + "result": { + "series_name": series_name, + "number_of_members": 3, + "is_stack": is_stack, + "is_montage": is_montage, + "output_files": output_files, + "metadata": str(metadata), + "parent_lif": None, + "parent_tiffs": {}, + "pixels_x": shape[0], + "pixels_y": shape[1], + "units": "m", + "pixel_size": pixel_size, + "resolution": 1 / pixel_size, + "extent": extent, + }, + } + messages.append(message) + return messages + + +@pytest.mark.skip +def test_register_clem_image_series(): + assert _register_clem_image_series + + +@pytest.mark.skip +def test_register_dcg_and_atlas(): + assert _register_dcg_and_atlas + + +@pytest.mark.skip +def test_register_grid_square(): + assert _register_grid_square + + +def test_run( + mocker: MockerFixture, + rsync_basepath: Path, +): + # Mock the MurfeyDB connection + mock_murfey_session_entry = MagicMock() + mock_murfey_session_entry.instrument_name = ExampleVisit.instrument_name + mock_murfey_session_entry.visit = visit_name + mock_murfey_db = MagicMock() + mock_murfey_db.exec().return_value.one.return_value = mock_murfey_session_entry + + # Mock the registration helper functions + mock_register_clem_series = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._register_clem_image_series" + ) + mock_register_dcg_and_atlas = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._register_dcg_and_atlas" + ) + mock_register_grid_square = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._register_grid_square" + ) + + # Mock the align and merge workflow call + mock_align_and_merge_call = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results.submit_cluster_request" + ) + + preprocessing_messages = generate_preprocessing_messages( + rsync_basepath=rsync_basepath, + session_id=ExampleVisit.murfey_session_id, + ) + for message in preprocessing_messages: + result = run( + message=message, + murfey_db=mock_murfey_db, + ) + assert result == {"success": True} + assert mock_register_clem_series.call_count == len(preprocessing_messages) + assert mock_register_dcg_and_atlas.call_count == len(preprocessing_messages) + assert mock_register_grid_square.call_count == len(preprocessing_messages) + assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( + colors + ) + + +test_matrix = ( + # Reverse order of list + (False,), + (True,), +) + + +@pytest.mark.parametrize("test_params", test_matrix) +def test_run_with_db( + mocker: MockerFixture, + rsync_basepath: Path, + mock_ispyb_credentials, + murfey_db_session: SQLModelSession, + ispyb_db_session: SQLAlchemySession, + test_params: tuple[bool], +): + # Unpack test params + (shuffle_message,) = test_params + + # Create a session to insert for this test + murfey_session: MurfeyDB.Session = get_or_create_db_entry( + murfey_db_session, + MurfeyDB.Session, + lookup_kwargs={ + "id": ExampleVisit.murfey_session_id + 1, + "name": visit_name, + "visit": visit_name, + "instrument_name": ExampleVisit.instrument_name, + }, + ) + + # Mock the ISPyB connection where the TransportManager class is located + mock_security_config = MagicMock() + mock_security_config.ispyb_credentials = mock_ispyb_credentials + mocker.patch( + "murfey.server.ispyb.get_security_config", + return_value=mock_security_config, + ) + mocker.patch( + "murfey.server.ispyb.ISPyBSession", + return_value=ispyb_db_session, + ) + + # Mock the ISPYB connection when registering data collection group + mocker.patch( + "murfey.workflows.register_data_collection_group.ISPyBSession", + return_value=ispyb_db_session, + ) + + # Mock out the machine config used in the helper sanitisation function + mock_get_machine_config = mocker.patch("murfey.workflows.clem.get_machine_config") + mock_machine_config = MagicMock() + mock_machine_config.rsync_basepath = rsync_basepath + mock_get_machine_config.return_value = { + ExampleVisit.instrument_name: mock_machine_config, + } + + # Mock the align and merge workflow call + mock_align_and_merge_call = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results.submit_cluster_request" + ) + + # Patch the TransportManager object in the workflows called + from murfey.server.ispyb import TransportManager + + mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._transport_object", + new=TransportManager("PikaTransport"), + ) + mocker.patch( + "murfey.workflows.register_data_collection_group._transport_object", + new=TransportManager("PikaTransport"), + ) + mocker.patch( + "murfey.workflows.register_atlas_update._transport_object", + new=TransportManager("PikaTransport"), + ) + + # Run the function + preprocessing_messages = generate_preprocessing_messages( + rsync_basepath=rsync_basepath, + session_id=murfey_session.id, + ) + if shuffle_message: + preprocessing_messages.reverse() + for message in preprocessing_messages: + result = run( + message=message, + murfey_db=murfey_db_session, + ) + assert result == {"success": True} + + # Each message should call the align-and-merge workflow thrice + # if gray and colour channels are both present + assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( + colors + ) + + # Both databases should have entries for data collection group, and grid squares + # ISPyB database should additionally have an atlas entry + murfey_dcg_search = murfey_db_session.exec( + sm_select(MurfeyDB.DataCollectionGroup).where( + MurfeyDB.DataCollectionGroup.session_id == murfey_session.id + ) + ).all() + assert len(murfey_dcg_search) == 1 + murfey_gs_search = murfey_db_session.exec( + sm_select(MurfeyDB.GridSquare).where( + MurfeyDB.GridSquare.session_id == murfey_session.id + ) + ).all() + assert len(murfey_gs_search) == len(preprocessing_messages) - 1 + + murfey_dcg = murfey_dcg_search[0] + ispyb_dcg_search = ( + ispyb_db_session.execute( + sa_select(ISPyBDB.DataCollectionGroup).where( + ISPyBDB.DataCollectionGroup.dataCollectionGroupId == murfey_dcg.id + ) + ) + .scalars() + .all() + ) + assert len(ispyb_dcg_search) == 1 + + ispyb_dcg = ispyb_dcg_search[0] + ispyb_atlas_search = ( + ispyb_db_session.execute( + sa_select(ISPyBDB.Atlas).where( + ISPyBDB.Atlas.dataCollectionGroupId == ispyb_dcg.dataCollectionGroupId + ) + ) + .scalars() + .all() + ) + assert len(ispyb_atlas_search) == 1 + + ispyb_atlas = ispyb_atlas_search[0] + ispyb_gs_search = ( + ispyb_db_session.execute( + sa_select(ISPyBDB.GridSquare).where( + ISPyBDB.GridSquare.atlasId == ispyb_atlas.atlasId + ) + ) + .scalars() + .all() + ) + assert len(ispyb_gs_search) == len(preprocessing_messages) - 1 + + murfey_db_session.close() + ispyb_db_session.close()