Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13
21 changes: 21 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[project]
name = "parallelization-benchmarks"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
{ name = "Egor Marin", email = "me@marinegor.dev" }
]
requires-python = ">=3.13"
dependencies = [
"mdanalysis[analysis,parallel]==2.10.0",
]

[build-system]
requires = ["uv_build>=0.8.15,<0.9.0"]
build-backend = "uv_build"

[dependency-groups]
k8s = [
"dask-kubernetes>=2025.7.0",
]
2 changes: 2 additions & 0 deletions src/parallelization_benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def hello() -> str:
return "Hello from parallelization-benchmarks!"
77 changes: 77 additions & 0 deletions src/parallelization_benchmarks/k8s.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import MDAnalysis as mda
from dask.distributed import Client, get_client
from MDAnalysis.analysis.backends import BackendBase
from MDAnalysis.analysis.dssp import DSSP
from dask_kubernetes.operator import KubeCluster
from dataclasses import dataclass
from dask.delayed import delayed

from typing import NamedTuple, Literal


class Resources(NamedTuple):
memory: str
cpu: str

def as_dict(self):
return {'memory': self.memory, 'cpu': self.cpu}

@dataclass(frozen=True)
class Config:
"""
Example:
--------

```python
config = {
"name": "dask-cluster-2",
"image": "<redacted>",
"n_workers": 7,
"resources":{"requests": {"memory": "2Gi", "cpu": "800m"},
"limits": {"memory": "4Gi", "cpu": "1"}}
}
```
"""

name: str
image: str
n_workers: int
resources: dict[Literal['requests', 'limits'], Resources]

def as_dict(self):
resources = {k:v.as_dict() for k, v in self.resources.items()}
return {'name': self.name, 'image': self.image, 'n_workers': self.n_workers, 'resources': resources}


def start_cluster(config: Config) -> KubeCluster:
return KubeCluster(**config.as_dict())

def get_client_for(cluster: KubeCluster) -> Client:
return Client(cluster)


class DistributedBackend(BackendBase):
def __init__(self, n_workers):
super().__init__(n_workers)

def assign_client(self, client):
self.client = client

def apply(self, func, computations):
return self.client.compute([delayed(func)(c) for c in computations], sync=True)


if __name__ == '__main__':
u = mda.Universe("YiiP_lipids.gro.gz", "YiiP_lipids.xtc")
backend = DistributedBackend(n_workers=5)
config = ...
cluster = start_cluster(config)
client = get_client(cluster)
backend.assign_client(client)

# test computation
tasks = [delayed(lambda x: x + 1)(i) for i in range(11)]
print(f"{client.compute(tasks, sync=True)=}")

# actual MDAnalysis computation
print(DSSP(u).run(backend=backend, unsupported_backend=True).results)
Loading