diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e700d35a0..6ca69bed0 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -77,7 +77,7 @@ jobs:
- name: Install Murfey
run: |
set -eux
- pip install --disable-pip-version-check -e "."[cicd,client,server,developer]
+ pip install --disable-pip-version-check -e "."[cicd,server,developer]
- uses: shogo82148/actions-setup-mysql@v1
with:
diff --git a/.github/workflows/publish-version.yml b/.github/workflows/publish-version.yml
index c054c29a2..9b4e52f02 100644
--- a/.github/workflows/publish-version.yml
+++ b/.github/workflows/publish-version.yml
@@ -24,7 +24,7 @@ jobs:
- name: Check current tag
id: checkTag
run: |
- pip install --disable-pip-version-check -e "."[cicd,client,server,developer]
+ pip install --disable-pip-version-check -e "."[cicd,server,developer]
VERSION=$(python -c "import murfey; print(murfey.__version__)")
echo "newVersion=v$VERSION" >> $GITHUB_OUTPUT
diff --git a/.github/workflows/version-bump.yml b/.github/workflows/version-bump.yml
index b52691360..aeef9c2d9 100644
--- a/.github/workflows/version-bump.yml
+++ b/.github/workflows/version-bump.yml
@@ -42,7 +42,7 @@ jobs:
- name: Install package
run: |
set -eux
- pip install --disable-pip-version-check -e "."[cicd,client,server,developer]
+ pip install --disable-pip-version-check -e "."[cicd,server,developer]
- name: Run bumpversion and push tag
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
diff --git a/Dockerfiles/murfey-instrument-server b/Dockerfiles/murfey-instrument-server
index 085c451aa..91f33688e 100644
--- a/Dockerfiles/murfey-instrument-server
+++ b/Dockerfiles/murfey-instrument-server
@@ -32,7 +32,7 @@ RUN apt-get update && \
pip \
build \
importlib-metadata && \
- /venv/bin/python -m pip install /python-murfey[client]
+ /venv/bin/python -m pip install /python-murfey
# Transfer completed Murfey build to base image
diff --git a/README.md b/README.md
index e3de57436..6bcddff6e 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ then install using the following command.
```text
$ git clone git@github.com:DiamondLightSource/python-murfey.git
$ cd python-murfey
-$ pip install -e .[client,server,developer]
+$ pip install -e .[server,developer]
```
The packages included under the `[developer]` installation key contain some helpful tools to aid you with developing Murfey further:
@@ -43,7 +43,7 @@ $ murfey.server
and connect the client with
```text
-$ murfey --server http://127.0.0.1:8000
+$ murfey.instrument_server --port 8000
```
You can also install a client on a remote machine. This machine only needs to have
diff --git a/pyproject.toml b/pyproject.toml
index 250a9b734..10c318a26 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -44,14 +44,12 @@ dependencies = [
cicd = [
"pytest-cov", # Used for generating PyTest coverage reports
]
-client = [
- "websocket-client",
-]
developer = [
"bump-my-version", # Version control
"ipykernel", # Enable interactive coding with VS Code and Jupyter Notebook
"pre-commit", # Formatting, linting, type checking, etc.
"pytest", # Test code functionality
+ "pytest-asyncio", # For testing async functions
"pytest-mock", # Additional mocking tools for unit tests
]
server = [
diff --git a/src/murfey/bootstrap/__main__.py b/src/murfey/bootstrap/__main__.py
index 109bd3a9e..393567652 100644
--- a/src/murfey/bootstrap/__main__.py
+++ b/src/murfey/bootstrap/__main__.py
@@ -144,7 +144,7 @@ def _download_to_file(url: str, outfile: str):
murfey_hostname,
"-i",
f"{murfey_base}{url_path_for('bootstrap.pypi', 'get_pypi_index')}",
- "murfey[client]",
+ "murfey",
]
)
if result.returncode:
diff --git a/src/murfey/client/customlogging.py b/src/murfey/client/customlogging.py
deleted file mode 100644
index 567cf7b7e..000000000
--- a/src/murfey/client/customlogging.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from __future__ import annotations
-
-import json
-import logging
-
-logger = logging.getLogger("murfey.client.customlogging")
-
-
-class CustomHandler(logging.Handler):
- def __init__(self, callback):
- """Set up a handler instance, record the callback function."""
- super().__init__()
- self._callback = callback
-
- def prepare(self, record):
- self.format(record)
- record_dict = record.__dict__
- record_dict["type"] = "log"
- try:
- return json.dumps(record_dict)
- except TypeError:
- return json.dumps({str(k): str(v) for k, v in record_dict.items()})
-
- def emit(self, record):
- try:
- self._callback(self.prepare(record))
- except Exception:
- self.handleError(record)
diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py
index c4dc5f684..302521cec 100644
--- a/src/murfey/client/multigrid_control.py
+++ b/src/murfey/client/multigrid_control.py
@@ -1,4 +1,3 @@
-import json
import logging
import subprocess
import threading
@@ -10,7 +9,6 @@
from typing import Dict, List, Optional
from urllib.parse import urlparse
-import murfey.client.websocket
from murfey.client.analyser import Analyser
from murfey.client.context import ensure_dcg_exists
from murfey.client.contexts.spa import SPAModularContext
@@ -97,11 +95,6 @@ def __post_init__(self):
self.rsync_processes = self.rsync_processes or {}
self.analysers = self.analysers or {}
- self.ws = murfey.client.websocket.WSApp(
- server=self.murfey_url,
- register_client=False,
- )
-
# Calculate the time offset between the client and the server
current_time = datetime.now()
server_timestamp = capture_get(
@@ -182,17 +175,6 @@ def clean_up_once_dormant(self, running_threads: list[threading.Thread]):
if not success:
log.warning(f"Could not delete database data for {self.session_id}")
- # Send message to frontend to trigger a refresh
- self.ws.send(
- json.dumps(
- {
- "message": "refresh",
- "target": "sessions",
- "instrument_name": self.instrument_name,
- }
- )
- )
-
# Mark as dormant
self.dormant = True
@@ -293,15 +275,6 @@ def _start_rsyncer_multigrid(
transfer=machine_data.get("data_transfer_enabled", True),
restarted=str(source) in self.rsync_restarts,
)
- self.ws.send(
- json.dumps(
- {
- "message": "refresh",
- "target": "rsyncer",
- "session_id": self.session_id,
- }
- )
- )
def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
if explicit_stop:
diff --git a/src/murfey/client/update.py b/src/murfey/client/update.py
index ce7732488..0f1fe6a7c 100644
--- a/src/murfey/client/update.py
+++ b/src/murfey/client/update.py
@@ -78,7 +78,7 @@ def install_murfey(api_base: ParseResult, version: str) -> bool:
path=f"{proxy_path}{url_path_for('bootstrap.pypi', 'get_pypi_index')}",
query="",
).geturl(),
- f"murfey[client]=={version}",
+ f"murfey=={version}",
]
)
return result.returncode == 0
diff --git a/src/murfey/client/websocket.py b/src/murfey/client/websocket.py
deleted file mode 100644
index 417120837..000000000
--- a/src/murfey/client/websocket.py
+++ /dev/null
@@ -1,191 +0,0 @@
-from __future__ import annotations
-
-import json
-import logging
-import queue
-import threading
-import time
-import urllib.parse
-import uuid
-from typing import Optional
-
-import websocket
-
-from murfey.client.instance_environment import MurfeyInstanceEnvironment
-from murfey.util.api import url_path_for
-
-log = logging.getLogger("murfey.client.websocket")
-
-
-class WSApp:
- environment: MurfeyInstanceEnvironment | None = None
-
- def __init__(
- self, *, server: str, id: int | str | None = None, register_client: bool = True
- ):
- self.id = str(uuid.uuid4()) if id is None else id
- log.info(f"Opening websocket connection for Client {self.id}")
- websocket.enableTrace(False)
-
- # Parse server URL and get proxy path used, if any
- url = urllib.parse.urlparse(server)._replace(
- scheme="wss" if server.startswith("https") else "ws"
- )
- proxy_path = url.path.rstrip("/")
-
- self._address = url.geturl()
- self._alive = True
- self._ready = False
- self._send_queue: queue.Queue[Optional[str]] = queue.Queue()
- self._receive_queue: queue.Queue[Optional[str]] = queue.Queue()
-
- # Construct the websocket URL
- # Prepend the proxy path to the new URL path
- # It will evaluate to "" if nothing's there, and starts with "/" if present
- ws_url = (
- url._replace(
- path=f"{proxy_path}{url_path_for('websocket.ws', 'websocket_endpoint', client_id=self.id)}"
- ).geturl()
- if register_client
- else url._replace(
- path=f"{proxy_path}{url_path_for('websocket.ws', 'websocket_connection_endpoint', client_id=self.id)}"
- ).geturl()
- )
- self._ws = websocket.WebSocketApp(
- ws_url,
- on_close=self.on_close,
- on_message=self.on_message,
- on_open=self.on_open,
- on_error=self.on_error,
- )
- self._ws_thread = threading.Thread(
- target=self._run_websocket_event_loop,
- daemon=True,
- name="websocket-connection",
- )
- self._ws_thread.start()
- self._feeder_thread = threading.Thread(
- target=self._send_queue_feeder, daemon=True, name="websocket-send-queue"
- )
- self._feeder_thread.start()
- self._receiver_thread = threading.Thread(
- target=self._receive_msgs, daemon=True, name="websocket-receive-queue"
- )
- self._receiver_thread.start()
- log.info("making wsapp")
-
- def __repr__(self):
- if self.alive:
- if self._ready:
- status = "connected"
- else:
- status = "connecting"
- else:
- status = "closed"
- return f""
-
- @property
- def alive(self):
- return self._alive and self._ws_thread.is_alive()
-
- def _run_websocket_event_loop(self):
- backoff = 0
- while True:
- attempt_start = time.perf_counter()
- connection_failure = self._ws.run_forever(ping_interval=30, ping_timeout=10)
- if not connection_failure:
- break
- if (time.perf_counter() - attempt_start) < 5:
- # rapid connection cycling
- backoff = min(120, backoff * 2 + 1)
- else:
- backoff = 0
- time.sleep(backoff)
- log.info("Websocket connection closed")
- self._alive = False
-
- def _send_queue_feeder(self):
- log.debug("Websocket send-queue-feeder thread starting")
- while self.alive:
- element = self._send_queue.get()
- if element is None:
- self._send_queue.task_done()
- continue
- while not self._ready:
- time.sleep(0.3)
- try:
- self._ws.send(element)
- except Exception:
- log.error("Error sending message through websocket", exc_info=True)
- self._send_queue.task_done()
- log.debug("Websocket send-queue-feeder thread stopped")
-
- def _receive_msgs(self):
- while self.alive:
- element = self._receive_queue.get()
- if element is None:
- self._send_queue.task_done()
- continue
- while not self._ready:
- time.sleep(0.3)
- try:
- self._handle_msg(element)
- except json.decoder.JSONDecodeError:
- pass
- self._receive_queue.task_done()
-
- def close(self):
- log.info("Closing websocket connection")
- if self._feeder_thread.is_alive():
- self._send_queue.join()
- self._alive = False
- if self._feeder_thread.is_alive():
- self._send_queue.put(None)
- self._feeder_thread.join()
- self._receiver_thread.join()
- try:
- self._ws.close()
- except Exception:
- log.error("Error closing websocket connection", exc_info=True)
-
- def on_message(self, ws: websocket.WebSocketApp, message: str):
- self._receive_queue.put(message)
-
- def _handle_msg(self, message: str):
- data = json.loads(message)
- if data.get("message") == "state-update":
- self._register_id(data["attribute"], data["value"])
- elif data.get("message") == "state-update-partial":
- self._register_id_partial(data["attribute"], data["value"])
-
- def _register_id(self, attribute: str, value):
- if self.environment and hasattr(self.environment, attribute):
- setattr(self.environment, attribute, value)
-
- def _register_id_partial(self, attribute: str, value):
- if self.environment and hasattr(self.environment, attribute):
- if isinstance(value, dict):
- new_value = {**getattr(self.environment, attribute), **value}
- setattr(
- self.environment,
- attribute,
- new_value,
- )
-
- def on_error(self, ws: websocket.WebSocketApp, error: websocket.WebSocketException):
- log.error(str(error))
-
- def on_close(self, ws: websocket.WebSocketApp, close_status_code, close_msg):
- self._ready = False
- if close_status_code or close_msg:
- log.debug(f"Websocket closed (code={close_status_code}, msg={close_msg})")
- else:
- log.debug("Websocket closed")
-
- def on_open(self, ws: websocket.WebSocketApp):
- log.info("Opened connection")
- self._ready = True
-
- def send(self, message: str):
- if self.alive:
- self._send_queue.put_nowait(message)
diff --git a/src/murfey/instrument_server/__init__.py b/src/murfey/instrument_server/__init__.py
index 41aca780f..f3c0e01d0 100644
--- a/src/murfey/instrument_server/__init__.py
+++ b/src/murfey/instrument_server/__init__.py
@@ -25,9 +25,9 @@ def start_instrument_server():
from rich.logging import RichHandler
import murfey
- import murfey.client.websocket
- from murfey.client.customlogging import CustomHandler
- from murfey.util import LogFilter
+ from murfey.util.api import url_path_for
+ from murfey.util.client import read_config
+ from murfey.util.logging import HTTPSHandler, LogFilter
parser = argparse.ArgumentParser(description="Start the Murfey server")
parser.add_argument(
@@ -55,22 +55,23 @@ def start_instrument_server():
logging.getLogger("fastapi").addHandler(rich_handler)
logging.getLogger("uvicorn").addHandler(rich_handler)
- # Create a websocket app to connect to the backend
- ws = murfey.client.websocket.WSApp(
- server=read_config().get("Murfey", "server", fallback=""),
- register_client=False,
+ # Construct URL for the HTTPS log handler
+ client_config = dict(read_config()["Murfey"])
+ murfey_server_url = client_config["server"].rstrip("/")
+ logger_url = (
+ f"{murfey_server_url}{url_path_for('api.logging.router', 'forward_logs')}"
)
# Forward DEBUG levels logs and above from Murfey to the backend
- murfey_ws_handler = CustomHandler(ws.send)
- murfey_ws_handler.setLevel(logging.DEBUG)
- logging.getLogger("murfey").addHandler(murfey_ws_handler)
+ murfey_https_handler = HTTPSHandler(endpoint_url=logger_url)
+ murfey_https_handler.setLevel(logging.DEBUG)
+ logging.getLogger("murfey").addHandler(murfey_https_handler)
# Forward only INFO level logs and above for other packages
- other_ws_handler = CustomHandler(ws.send)
- other_ws_handler.setLevel(logging.INFO)
- logging.getLogger("fastapi").addHandler(other_ws_handler)
- logging.getLogger("uvicorn").addHandler(other_ws_handler)
+ other_https_handler = HTTPSHandler(endpoint_url=logger_url)
+ other_https_handler.setLevel(logging.INFO)
+ logging.getLogger("fastapi").addHandler(other_https_handler)
+ logging.getLogger("uvicorn").addHandler(other_https_handler)
logger.info(
f"Starting Murfey server version {murfey.__version__}, listening on {args.host}:{args.port}"
diff --git a/src/murfey/server/api/hub.py b/src/murfey/server/api/hub.py
index f17db4a1c..c0a74d05a 100644
--- a/src/murfey/server/api/hub.py
+++ b/src/murfey/server/api/hub.py
@@ -7,7 +7,7 @@
from murfey.util.config import get_machine_config
-logger = getLogger("murfey.api.hub")
+logger = getLogger("murfey.server.api.hub")
config = get_machine_config()
diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py
index 9069535ce..d71e412a2 100644
--- a/src/murfey/server/api/instrument.py
+++ b/src/murfey/server/api/instrument.py
@@ -36,7 +36,7 @@
tags=["Instrument Server"],
)
-log = logging.getLogger("murfey.server.instrument")
+log = logging.getLogger("murfey.server.api.instrument")
lock = asyncio.Lock()
diff --git a/src/murfey/server/api/logging.py b/src/murfey/server/api/logging.py
new file mode 100644
index 000000000..fde075d72
--- /dev/null
+++ b/src/murfey/server/api/logging.py
@@ -0,0 +1,35 @@
+import json
+import logging
+from datetime import datetime
+from typing import Any
+
+from fastapi import APIRouter, Request
+
+logger = logging.getLogger("murfey.server.api.logging")
+
+router = APIRouter(
+ prefix="/logging",
+ tags=["Logging"],
+)
+
+
+@router.post("/logs")
+async def forward_logs(request: Request):
+ """
+ Receives a list of stringified JSON log records from the instrument server,
+ unpacks them, and forwards them through the handlers set up on the backend.
+ """
+
+ data: list[str] = await request.json()
+ for line in data:
+ log_data: dict[str, Any] = json.loads(line)
+ logger_name = log_data["name"]
+ log_data.pop("msecs", None)
+ log_data.pop("relativeCreated", None)
+ client_timestamp = log_data.pop("created", 0)
+ if client_timestamp:
+ log_data["client_time"] = datetime.fromtimestamp(
+ client_timestamp
+ ).isoformat()
+ log_data["client_host"] = request.client.host if request.client else None
+ logging.getLogger(logger_name).handle(logging.makeLogRecord(log_data))
diff --git a/src/murfey/server/api/websocket.py b/src/murfey/server/api/websocket.py
index b0a012acf..bbabf68b3 100644
--- a/src/murfey/server/api/websocket.py
+++ b/src/murfey/server/api/websocket.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-import asyncio
import json
import logging
from datetime import datetime
@@ -9,7 +8,6 @@
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from sqlmodel import Session, select
-import murfey.server.prometheus as prom
from murfey.server.murfey_db import get_murfey_db_session
from murfey.util import sanitise
from murfey.util.db import ClientEnvironment
@@ -17,7 +15,7 @@
T = TypeVar("T")
ws = APIRouter(prefix="/ws", tags=["Websocket"])
-log = logging.getLogger("murfey.server.websocket")
+log = logging.getLogger("murfey.server.api.websocket")
class ConnectionManager:
@@ -69,31 +67,6 @@ async def broadcast(self, message: str):
manager = ConnectionManager()
-@ws.websocket("/test/{client_id}")
-async def websocket_endpoint(websocket: WebSocket, client_id: int):
- await manager.connect(websocket, client_id)
- await manager.broadcast(f"Client {client_id} joined")
- try:
- while True:
- data = await websocket.receive_text()
- try:
- json_data: dict = json.loads(data)
- if json_data["type"] == "log": # and isinstance(json_data, dict)
- json_data.pop("type")
- await forward_log(json_data, websocket)
- except Exception:
- await manager.broadcast(f"Client #{client_id} sent message {data}")
- except WebSocketDisconnect:
- log.info(f"Disconnecting Client {int(sanitise(str(client_id)))}")
- murfey_db = next(get_murfey_db_session())
- client_env = murfey_db.exec(
- select(ClientEnvironment).where(ClientEnvironment.client_id == client_id)
- ).one()
- prom.monitoring_switch.labels(visit=client_env.visit).set(0)
- manager.disconnect(client_id)
- await manager.broadcast(f"Client #{client_id} disconnected")
-
-
@ws.websocket("/connect/{client_id}")
async def websocket_connection_endpoint(
websocket: WebSocket,
@@ -120,17 +93,6 @@ async def websocket_connection_endpoint(
await manager.broadcast(f"Client #{client_id} disconnected")
-async def check_connections(active_connections: list[WebSocket]):
- log.info("Checking connections")
- for connection in active_connections:
- log.info("Checking response")
- try:
- await asyncio.wait_for(connection.receive(), timeout=10)
- except asyncio.TimeoutError:
- log.info(f"Disconnecting Client {connection[0]}")
- manager.disconnect(connection[0], connection[1])
-
-
async def forward_log(logrecord: dict[str, Any], websocket: WebSocket):
record_name = logrecord["name"]
logrecord.pop("msecs", None)
@@ -142,26 +104,8 @@ async def forward_log(logrecord: dict[str, Any], websocket: WebSocket):
logging.getLogger(record_name).handle(logging.makeLogRecord(logrecord))
-@ws.delete("/test/{client_id}")
-async def close_ws_connection(client_id: int):
- murfey_db: Session = next(get_murfey_db_session())
- client_env = murfey_db.exec(
- select(ClientEnvironment).where(ClientEnvironment.client_id == client_id)
- ).one()
- client_env.connected = False
- visit_name = client_env.visit
- murfey_db.add(client_env)
- murfey_db.commit()
- murfey_db.close()
- client_id_str = str(client_id).replace("\r\n", "").replace("\n", "")
- log.info(f"Disconnecting {client_id_str}")
- manager.disconnect(client_id)
- prom.monitoring_switch.labels(visit=visit_name).set(0)
- await manager.broadcast(f"Client #{client_id} disconnected")
-
-
@ws.delete("/connect/{client_id}")
-async def close_unrecorded_ws_connection(client_id: int | str):
+async def close_websocket_connection(client_id: int | str):
client_id_str = str(client_id).replace("\r\n", "").replace("\n", "")
log.info(f"Disconnecting {client_id_str}")
manager.disconnect(client_id)
diff --git a/src/murfey/server/main.py b/src/murfey/server/main.py
index a65896aa0..613546bfd 100644
--- a/src/murfey/server/main.py
+++ b/src/murfey/server/main.py
@@ -18,6 +18,7 @@
import murfey.server.api.file_io_instrument
import murfey.server.api.hub
import murfey.server.api.instrument
+import murfey.server.api.logging
import murfey.server.api.mag_table
import murfey.server.api.processing_parameters
import murfey.server.api.prometheus
@@ -77,6 +78,8 @@ class Settings(BaseSettings):
app.include_router(murfey.server.api.instrument.router)
+app.include_router(murfey.server.api.logging.router)
+
app.include_router(murfey.server.api.mag_table.router)
app.include_router(murfey.server.api.session_control.router)
diff --git a/src/murfey/server/run.py b/src/murfey/server/run.py
index 26eb75bc0..aedbd660f 100644
--- a/src/murfey/server/run.py
+++ b/src/murfey/server/run.py
@@ -16,8 +16,8 @@
import murfey.server
from murfey.server.feedback import feedback_listen
from murfey.server.ispyb import TransportManager
-from murfey.util import LogFilter
from murfey.util.config import get_microscope, get_security_config
+from murfey.util.logging import LogFilter
logger = logging.getLogger("murfey.server.run")
diff --git a/src/murfey/templates/bootstrap.html b/src/murfey/templates/bootstrap.html
index 2dab84d6f..fcb7f05f7 100644
--- a/src/murfey/templates/bootstrap.html
+++ b/src/murfey/templates/bootstrap.html
@@ -2,7 +2,7 @@
%} {% block content %}
Bootstrapping Instructions
1. Setting Up a POSIX Environment
-Installing MSYS2
+A. Installing MSYS2
MSYS2 is a POSIX environment which provides extensive compiler support for the
more modern programming languages used by Murfey's package dependencies.
@@ -14,9 +14,14 @@
Installing MSYS2
mirror, then run it using the default settings. This will install MSYS2 to
- C:\msys64.
+
+ C:\msys64 .
-Setting Up the MSYS2 Package Manager (If Network-Restricted)
+B. Setting Up MSYS2
+i. Setting Up the MSYS2 Package Manager (If Network-Restricted)
By default, MSYS2 comes with preset lists of mirrors and servers that it
installs its packages from. On a network-restricted PC, these will need to be
@@ -36,7 +41,7 @@
Setting Up the MSYS2 Package Manager (If Network-Restricted)
>
C:\msys64\etc\pacman.d
-Installing Dependencies
+ii. Installing Dependencies
MSYS2 comes with multiple environments, but UCRT64 is the most modern one. In
order for the Murfey client to be able to install and run its dependencies
@@ -69,7 +74,7 @@
Installing Dependencies
>pacman -Ss <package-name>
-Configuring the Rust Package Manager (If Network-Restricted)
+iii. Configuring the Rust Package Manager (If Network-Restricted)
Many newer Python packages now have dependencies written in Rust that allow
them to operate more efficiently. MSYS2 supports the compilation and
@@ -132,30 +137,6 @@
Configuring the Rust Package Manager (If Network-Restricted)
instead.
-Running MSYS2 Through Command Prompt
-
- In order to run Murfey via the terminal, MSYS2 will have to be run through
- Window's Command Prompt terminal, as there is an ongoing bug with MSYS2's
- pre-packaged terminal that prevents mouse interaction with interactive apps in
- the terminal.
-
-
- To do so, simply right-click on your desktop and navigate to
- New > Shortcut. When prompted for the location of the item, enter
- the following into the text box:
-
-
- cmd.exe /k "C:\msys64\msys2_shell.cmd -defterm -no-start -ucrt64 -shell bash"
-
-
- After naming the shortcut, click Finish to create the shortcut. This will run
- a UCRT64 instance of MSYS2 through the Command Prompt terminal that starts you
- off in MSYS2's default home directory. You can proceed to customise the
- shortcut icon to taste.
-
-
2. Setting Up Python
Once Python and
@@ -180,25 +161,11 @@
A. (Optional) Setting Up a Virtual Environment
B. Installing Murfey
You can install Murfey in the Python environment (the base one or a virtual
- environment) in either the Cygwin or UCRT64 terminal using the following
- commands:
+ environment) in the UCRT64 terminal using the following commands:
- $ pip install murfey[client] --index-url {{ request.url.scheme }}://{{ netloc }}{{ proxy_path }}/pypi/index --trusted-host {{ netloc }}
+ $ pip install murfey --index-url {{ request.url.scheme }}://{{ netloc }}{{ proxy_path }}/pypi/index --trusted-host {{ netloc }}
-
- If you wish to install the client-side dependencies needed to run Murfey via
- the web UI, replace
- murfey[client]
- with
- murfey[client,instrument-server].
-
{% endblock %}
diff --git a/src/murfey/util/__init__.py b/src/murfey/util/__init__.py
index 98370e5d0..fc3e878d2 100644
--- a/src/murfey/util/__init__.py
+++ b/src/murfey/util/__init__.py
@@ -96,42 +96,6 @@ def wait(self):
self.thread.join()
-class LogFilter(logging.Filter):
- """A filter to limit messages going to Graylog"""
-
- def __repr__(self):
- return ""
-
- def __init__(self):
- super().__init__()
- self._filter_levels = {
- "murfey": logging.DEBUG,
- "ispyb": logging.DEBUG,
- "zocalo": logging.DEBUG,
- "uvicorn": logging.INFO,
- "fastapi": logging.INFO,
- "starlette": logging.INFO,
- "sqlalchemy": logging.INFO,
- }
-
- @staticmethod
- def install() -> LogFilter:
- logfilter = LogFilter()
- root_logger = logging.getLogger()
- for handler in root_logger.handlers:
- handler.addFilter(logfilter)
- return logfilter
-
- def filter(self, record: logging.LogRecord) -> bool:
- logger_name = record.name
- while True:
- if logger_name in self._filter_levels:
- return record.levelno >= self._filter_levels[logger_name]
- if "." not in logger_name:
- return False
- logger_name = logger_name.rsplit(".", maxsplit=1)[0]
-
-
def safe_run(
func: Callable,
args: list | tuple = [],
diff --git a/src/murfey/util/logging.py b/src/murfey/util/logging.py
new file mode 100644
index 000000000..acab9c406
--- /dev/null
+++ b/src/murfey/util/logging.py
@@ -0,0 +1,155 @@
+from __future__ import annotations
+
+import json
+import logging
+import threading
+import time
+from queue import Empty, Queue
+
+import requests
+
+
+class LogFilter(logging.Filter):
+ """A filter to limit messages going to Graylog"""
+
+ def __repr__(self):
+ return ""
+
+ def __init__(self):
+ super().__init__()
+ self._filter_levels = {
+ "murfey": logging.DEBUG,
+ "ispyb": logging.DEBUG,
+ "zocalo": logging.DEBUG,
+ "uvicorn": logging.INFO,
+ "fastapi": logging.INFO,
+ "starlette": logging.INFO,
+ "sqlalchemy": logging.INFO,
+ }
+
+ @staticmethod
+ def install() -> LogFilter:
+ logfilter = LogFilter()
+ root_logger = logging.getLogger()
+ for handler in root_logger.handlers:
+ handler.addFilter(logfilter)
+ return logfilter
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ logger_name = record.name
+ while True:
+ if logger_name in self._filter_levels:
+ return record.levelno >= self._filter_levels[logger_name]
+ if "." not in logger_name:
+ return False
+ logger_name = logger_name.rsplit(".", maxsplit=1)[0]
+
+
+class HTTPSHandler(logging.Handler):
+ """
+ A log handler collects log messages and posts them in batches to the backend
+ FastAPI server using HTTPS POST.
+ """
+
+ def __init__(
+ self,
+ endpoint_url: str,
+ min_batch: int = 5,
+ max_batch: int = 50,
+ min_interval: float = 0.5,
+ max_interval: float = 2.0,
+ max_retry: int = 5,
+ timeout: int = 3,
+ token: str = "",
+ ):
+ super().__init__()
+ self.endpoint_url = endpoint_url
+ self.queue: Queue = Queue()
+ self._stop_event = threading.Event()
+ self.min_batch = min_batch
+ self.max_batch = max_batch
+ self.min_interval = min_interval
+ self.max_interval = max_interval
+ self.max_retry = max_retry
+ self.timeout = timeout
+ self.token = token
+
+ self.log_times: list[
+ float
+ ] = [] # Timestamps of recent logs for rate estimation
+ self.thread = threading.Thread(target=self._worker, daemon=True)
+ self.thread.start()
+
+ def emit(self, record: logging.LogRecord):
+ """
+ Formats the log and puts it on a queue for submission to the backend server
+ """
+ try:
+ log_entry = self.format_record(record)
+ self.queue.put(log_entry)
+ self.log_times.append(time.time())
+ except Exception:
+ self.handleError(record)
+
+ def format_record(self, record: logging.LogRecord):
+ """
+ Packages the log record as a JSON-formatted string
+ """
+ self.format(record)
+ log_data = record.__dict__.copy()
+ log_data["type"] = "log"
+ return json.dumps(log_data)
+
+ def _worker(self):
+ """
+ The worker function when the handler is run as a thread.
+ """
+
+ batch: list[str] = []
+ last_flush = time.time()
+
+ while not self._stop_event.is_set():
+ try:
+ log_entry = self.queue.get(timeout=0.05)
+ batch.append(log_entry)
+ # If the queue is empty, check back again
+ except Empty:
+ time.sleep(1)
+ continue
+
+ # Calculate logging rate based on past second
+ now = time.time()
+ self.log_times = [t for t in self.log_times if now - t <= 1.0]
+ log_rate = len(self.log_times)
+
+ # Adjust batch size and flush interval
+ batch_size = min(max(self.min_batch, log_rate), self.max_batch)
+ flush_interval = max(
+ self.min_interval, min(self.max_interval, 1 / max(log_rate, 1))
+ )
+
+ # Flush if batch is ready
+ if batch and (
+ len(batch) >= batch_size or now - last_flush >= flush_interval
+ ):
+ self._send_batch(batch)
+ batch = []
+ last_flush = now
+
+ # Flush remaining logs on shutdown
+ if batch:
+ self._send_batch(batch)
+
+ def _send_batch(self, batch: list[str]):
+ for attempt in range(0, self.max_retry):
+ try:
+ response = requests.post(self.endpoint_url, json=batch, timeout=5)
+ if response.status_code == 200:
+ return
+ except requests.RequestException:
+ time.sleep(2 ** (attempt + 1) * 0.1) # Exponential backoff
+
+ def close(self):
+ self._stop_event.set()
+ self.thread.join()
+ super().close()
diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml
index 2f8cd36a3..6fdeaf47f 100644
--- a/src/murfey/util/route_manifest.yaml
+++ b/src/murfey/util/route_manifest.yaml
@@ -692,6 +692,12 @@ murfey.server.api.instrument.router:
type: int
methods:
- GET
+murfey.server.api.logging.router:
+ - path: /logging/logs
+ function: forward_logs
+ path_params: []
+ methods:
+ - POST
murfey.server.api.mag_table.router:
- path: /mag_table/mag_table/
function: get_mag_table
@@ -1249,27 +1255,14 @@ murfey.server.api.session_info.tomo_router:
methods:
- GET
murfey.server.api.websocket.ws:
- - path: /ws/test/{client_id}
- function: websocket_endpoint
- path_params:
- - name: client_id
- type: int
- methods: []
- path: /ws/connect/{client_id}
function: websocket_connection_endpoint
path_params:
- name: client_id
type: int | str
methods: []
- - path: /ws/test/{client_id}
- function: close_ws_connection
- path_params:
- - name: client_id
- type: int
- methods:
- - DELETE
- path: /ws/connect/{client_id}
- function: close_unrecorded_ws_connection
+ function: close_websocket_connection
path_params:
- name: client_id
type: int | str
diff --git a/tests/instrument_server/test_init.py b/tests/instrument_server/test_init.py
index e575ecea0..50015bae5 100644
--- a/tests/instrument_server/test_init.py
+++ b/tests/instrument_server/test_init.py
@@ -1,5 +1,7 @@
+import logging
import sys
from typing import Optional
+from unittest.mock import MagicMock
from urllib.parse import urlparse
import pytest
@@ -14,6 +16,7 @@
from murfey.instrument_server import check_for_updates, start_instrument_server
from murfey.server.api.bootstrap import pypi as pypi_router, version as version_router
from murfey.util.api import url_path_for
+from murfey.util.logging import HTTPSHandler
# Set up a test router with only the essential endpoints
app = FastAPI()
@@ -132,20 +135,33 @@ def test_check_for_updates(
@pytest.mark.parametrize("test_params", start_instrument_server_test_matrix)
def test_start_instrument_server(
- mocker: MockerFixture, test_params: tuple[Optional[str], Optional[int]]
+ mocker: MockerFixture,
+ mock_client_configuration,
+ test_params: tuple[Optional[str], Optional[int]],
):
# Unpack test params
host, port = test_params
+ # Patch the 'read_config' function
+ _ = mocker.patch(
+ "murfey.util.client.read_config", return_value=mock_client_configuration
+ )
+
+ # Mock the HTTPSHandler (test it separately in a unit test)
+ mock_https_handler_instance = MagicMock()
+ mock_https_handler_instance.level = logging.INFO
+ mock_https_handler_instance.setLevel.return_value = None
+ mock_https_handler = mocker.patch(
+ "murfey.util.logging.HTTPSHandler",
+ spec=HTTPSHandler,
+ )
+ mock_https_handler.return_value = mock_https_handler_instance
+
# Patch the Uvicorn Server instance
mock_server = mocker.patch("uvicorn.Server")
# Disable 'run'; we just want to confirm it's called correctly
mock_server.run.return_value = lambda: None
- # Patch the websocket instance
- mock_wsapp = mocker.patch("murfey.client.websocket.WSApp")
- mock_wsapp.return_value = mocker.Mock() # Disable functionality
-
# Construct the expected Uvicorn Config object and save it as a dict
expected_config = vars(
uvicorn.Config(
diff --git a/tests/server/api/test_logging_api.py b/tests/server/api/test_logging_api.py
new file mode 100644
index 000000000..0d46012cb
--- /dev/null
+++ b/tests/server/api/test_logging_api.py
@@ -0,0 +1,89 @@
+import json
+import logging
+import time
+from datetime import datetime
+from typing import Any
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+from pytest_mock import MockerFixture
+
+import murfey
+from murfey.server.api.logging import forward_logs
+
+
+@pytest.mark.asyncio
+async def test_forward_logs(
+ mocker: MockerFixture,
+):
+ # Create example log messages
+ message_list = [
+ json.dumps(
+ {
+ "name": f"murfey.{module_name}",
+ "msg": "Starting Murfey server version {murfey.__version__}, listening on 0.0.0.0:8000",
+ "args": [],
+ "levelname": levelname,
+ "levelno": levelno,
+ "pathname": f"{murfey.__file__}/{module_name}/__init__.py",
+ "filename": "__init__.py",
+ "module": "__init__",
+ "exc_info": None,
+ "exc_text": None,
+ "stack_info": None,
+ "lineno": 76,
+ "funcName": f"start_{module_name}",
+ "created": time.time(),
+ "msecs": 930.0,
+ "relativeCreated": 1379.8329830169678,
+ "thread": time.time_ns(),
+ "threadName": "MainThread",
+ "processName": "MainProcess",
+ "process": time.time_ns(),
+ "message": f"Starting Murfey server version {murfey.__version__}, listening on 0.0.0.0:8000",
+ "type": "log",
+ }
+ )
+ for module_name, levelname, levelno in (
+ ("module_1", "DEBUG", logging.DEBUG),
+ ("module_2", "INFO", logging.INFO),
+ ("module_3", "WARNING", logging.WARNING),
+ ("module_4", "ERROR", logging.ERROR),
+ )
+ ]
+
+ # Create a mock request to pass to the function
+ mock_request = MagicMock()
+ mock_request.json = AsyncMock(return_value=message_list)
+
+ # Mock the logging module
+ mock_logging = mocker.patch("murfey.server.api.logging.logging")
+
+ # Mock the 'getLogger()' and 'handle()' functions
+ mock_logger = MagicMock()
+ mock_logger.handle.return_value = None
+ mock_logging.getLogger.return_value = mock_logger
+
+ # Run the function and check that the results are as expected
+ await forward_logs(mock_request)
+
+ # Check that the correct logger name was called.
+ for i, message in enumerate(message_list):
+ # Process the message as in the actual function
+ log_data: dict[str, Any] = json.loads(message)
+ logger_name = log_data["name"]
+ log_data.pop("msecs", None)
+ log_data.pop("relativeCreated", None)
+ client_timestamp = log_data.pop("created", 0)
+ if client_timestamp:
+ log_data["client_time"] = datetime.fromtimestamp(
+ client_timestamp
+ ).isoformat()
+ log_data["client_host"] = None # No host, as function is being tested directly
+
+ # Check that messages are unpacked and handled in sequence
+ mock_logging.getLogger.call_args_list[i][0][0] == logger_name
+ mock_logger.handle.call_args_list[i][0][0] == logging.makeLogRecord(log_data)
+
+ # Check that 'handle' was called for each message
+ assert mock_logger.handle.call_count == len(message_list)
diff --git a/tests/util/test_logging_util.py b/tests/util/test_logging_util.py
new file mode 100644
index 000000000..596ae4118
--- /dev/null
+++ b/tests/util/test_logging_util.py
@@ -0,0 +1,71 @@
+import time
+from unittest import mock
+from unittest.mock import MagicMock
+
+import pytest
+from fastapi import Response
+from pytest_mock import MockerFixture
+
+from murfey.util.logging import HTTPSHandler
+
+https_handler_test_matrix = (
+ # Num messages | Status code
+ (10, 200),
+ (10, 404),
+)
+
+
+@pytest.mark.parametrize("test_params", https_handler_test_matrix)
+def test_https_handler(
+ mocker: MockerFixture,
+ mock_client_configuration,
+ test_params: tuple[int, int],
+):
+ # Unpack test params
+ num_messages, status_code = test_params
+
+ # Mock the imported 'requests' module and the HTTPX response
+ mock_response = MagicMock(spec=Response)
+ mock_response.status_code = status_code
+ mock_requests = mocker.patch("murfey.util.logging.requests")
+ mock_requests.post.return_value = mock_response
+
+ # Import logger and set up a logger object
+ from logging import getLogger
+
+ # Initialise the logger with URL from mock client config
+ client_config = dict(mock_client_configuration["Murfey"])
+ server_url = client_config["server"]
+ https_handler = HTTPSHandler(
+ endpoint_url=server_url,
+ min_batch=5,
+ max_batch=10,
+ min_interval=0.5,
+ max_interval=1.0,
+ max_retry=1,
+ )
+
+ logger = getLogger("tests.util.test_logging")
+ logger.setLevel(10)
+ logger.addHandler(https_handler)
+ for i in range(num_messages):
+ # Test all the logging levels
+ if i % 4 == 0:
+ logger.debug("This is a debug log")
+ if i % 4 == 1:
+ logger.info("This is an info log")
+ if i % 4 == 2:
+ logger.warning("This is a warning log")
+ if i % 4 == 3:
+ logger.error("This is an error log")
+
+ # Let it run in the background before checking for the expected calls
+ time.sleep(1)
+ mock_requests.post.assert_called_with(
+ server_url,
+ json=mock.ANY,
+ timeout=5,
+ )
+
+ # Close the handler thread
+ https_handler.close()