Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
do_transfer: bool = True
dummy_dc: bool = False
force_mdoc_metadata: bool = True
rsync_restarts: List[str] = field(default_factory=lambda: [])

Check warning on line 39 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L39

Added line #L39 was not covered by tests
rsync_processes: Dict[Path, RSyncer] = field(default_factory=lambda: {})
analysers: Dict[Path, Analyser] = field(default_factory=lambda: {})
data_collection_parameters: dict = field(default_factory=lambda: {})
Expand Down Expand Up @@ -103,7 +104,11 @@
f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine"
).json()
if destination_overrides.get(source):
destination = destination_overrides[source] + f"/{extra_directory}"
destination = (

Check warning on line 107 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L107

Added line #L107 was not covered by tests
destination_overrides[source]
if str(source) in self.rsync_restarts
else destination_overrides[source] + f"/{extra_directory}"
)
else:
for k, v in destination_overrides.items():
if Path(v).name in source.parts:
Expand Down Expand Up @@ -134,6 +139,7 @@
tag=tag,
limited=limited,
transfer=machine_data.get("data_transfer_enabled", True),
restarted=str(source) in self.rsync_restarts,
)
self.ws.send(json.dumps({"message": "refresh"}))

Expand Down Expand Up @@ -175,6 +181,7 @@
tag: str = "",
limited: bool = False,
transfer: bool = True,
restarted: bool = False,
):
log.info(f"starting rsyncer: {source}")
if self._environment:
Expand Down Expand Up @@ -238,15 +245,21 @@
),
secondary=True,
)
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
rsyncer_data = {
"source": str(source),
"destination": destination,
"session_id": self.session_id,
"transferring": self.do_transfer or self._environment.demo,
"tag": tag,
}
requests.post(url, json=rsyncer_data)
if restarted:
restarted_url = (

Check warning on line 249 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L249

Added line #L249 was not covered by tests
f"{self.murfey_url}/sessions/{self.session_id}/rsyncer_started"
)
capture_post(restarted_url, json={"source": str(source)})

Check warning on line 252 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L252

Added line #L252 was not covered by tests
else:
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
rsyncer_data = {

Check warning on line 255 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L254-L255

Added lines #L254 - L255 were not covered by tests
"source": str(source),
"destination": destination,
"session_id": self.session_id,
"transferring": self.do_transfer or self._environment.demo,
"tag": tag,
}
requests.post(url, json=rsyncer_data)

Check warning on line 262 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L262

Added line #L262 was not covered by tests
self._environment.watchers[source] = DirWatcher(source, settling_time=30)

if not self.analysers.get(source) and analyse:
Expand Down
6 changes: 5 additions & 1 deletion src/murfey/client/tui/screens.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ def determine_default_destination(
_default = ""
else:
_default = destination + f"/{visit}"
return _default + f"/{extra_directory}"
return (
_default + f"/{extra_directory}"
if not _default.endswith("/")
else _default + f"{extra_directory}"
)


class InputResponse(NamedTuple):
Expand Down
18 changes: 16 additions & 2 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import secrets
import time
from datetime import datetime
from functools import partial

Check warning on line 6 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L6

Added line #L6 was not covered by tests
from logging import getLogger
from pathlib import Path
from typing import Annotated, Dict, List, Optional, Union
Expand All @@ -28,7 +29,7 @@

watchers: Dict[Union[str, int], MultigridDirWatcher] = {}
rsyncers: Dict[str, RSyncer] = {}
controllers = {}
controllers: Dict[int, MultigridController] = {}

Check warning on line 32 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L32

Added line #L32 was not covered by tests
data_collection_parameters: dict = {}
tokens = {}

Expand Down Expand Up @@ -131,10 +132,17 @@
)


@router.get("/sessions/{session_id}/check_token")
def check_token(session_id: MurfeySessionID):
return {"token_valid": True}

Check warning on line 137 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L135-L137

Added lines #L135 - L137 were not covered by tests


@router.post("/sessions/{session_id}/multigrid_watcher")
def start_multigrid_watcher(
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec
):
if controllers.get(session_id) is not None:
return {"success": True}

Check warning on line 145 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L145

Added line #L145 was not covered by tests
label = watcher_spec.label
controllers[session_id] = MultigridController(
[],
Expand All @@ -148,6 +156,7 @@
_machine_config=watcher_spec.configuration.dict(),
token=tokens.get(session_id, "token"),
data_collection_parameters=data_collection_parameters.get(label, {}),
rsync_restarts=watcher_spec.rsync_restarts,
)
watcher_spec.source.mkdir(exist_ok=True)
machine_config = requests.get(
Expand All @@ -161,7 +170,12 @@
watcher_spec.configuration.dict(),
skip_existing_processing=watcher_spec.skip_existing_processing,
)
watchers[session_id].subscribe(controllers[session_id]._start_rsyncer_multigrid)
watchers[session_id].subscribe(

Check warning on line 173 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L173

Added line #L173 was not covered by tests
partial(
controllers[session_id]._start_rsyncer_multigrid,
destination_overrides=watcher_spec.destination_overrides,
)
)
watchers[session_id].start()
return {"success": True}

Expand Down
22 changes: 22 additions & 0 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@
return success


@router.get("/instruments/{instrument_name}/sessions/{session_id}/active")
async def check_if_session_is_active(instrument_name: str, session_id: int):
if instrument_server_tokens.get(session_id) is None:
return {"active": False}
async with lock:
async with aiohttp.ClientSession() as session:
machine_config = get_machine_config(instrument_name=instrument_name)[

Check warning on line 84 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L81-L84

Added lines #L81 - L84 were not covered by tests
instrument_name
]
async with session.get(

Check warning on line 87 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L87

Added line #L87 was not covered by tests
f"{machine_config.instrument_server_url}/sessions/{int(sanitise(str(session_id)))}/check_token",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
) as response:
return {"active": response.status == 200}

Check warning on line 93 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L93

Added line #L93 was not covered by tests


@router.post("/sessions/{session_id}/multigrid_watcher")
async def start_multigrid_watcher(
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSetup, db=murfey_db
Expand Down Expand Up @@ -109,6 +127,10 @@
"label": visit,
"instrument_name": instrument_name,
"skip_existing_processing": watcher_spec.skip_existing_processing,
"destination_overrides": {
str(k): v for k, v in watcher_spec.destination_overrides.items()
},
"rsync_restarts": watcher_spec.rsync_restarts,
},
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
Expand Down
3 changes: 3 additions & 0 deletions src/murfey/util/instrument_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from typing import Dict, List

Check warning on line 2 in src/murfey/util/instrument_models.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/util/instrument_models.py#L2

Added line #L2 was not covered by tests

from pydantic import BaseModel

Expand All @@ -12,3 +13,5 @@
visit: str
instrument_name: str
skip_existing_processing: bool = False
destination_overrides: Dict[Path, str] = {}
rsync_restarts: List[str] = []

Check warning on line 17 in src/murfey/util/instrument_models.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/util/instrument_models.py#L16-L17

Added lines #L16 - L17 were not covered by tests
2 changes: 2 additions & 0 deletions src/murfey/util/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ class PostInfo(BaseModel):
class MultigridWatcherSetup(BaseModel):
source: Path
skip_existing_processing: bool = False
destination_overrides: Dict[Path, str] = {}
rsync_restarts: List[str] = []


class CurrentGainRef(BaseModel):
Expand Down