Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f427898
sketch out sync codecs + threadpool
d-v-b Feb 18, 2026
dbdc3d4
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 18, 2026
65d1230
fix perf regressions
d-v-b Feb 19, 2026
e24fe7e
Merge branch 'perf/faster-codecs' of https://github.com/d-v-b/zarr-py…
d-v-b Feb 19, 2026
f979eaa
add partial encode / decode
d-v-b Feb 19, 2026
a934899
add sync hotpath
d-v-b Feb 19, 2026
b53ac3e
add comments and documentation
d-v-b Feb 19, 2026
73ac845
refactor sharding to allow sync
d-v-b Feb 19, 2026
aeecda8
fix array spec propagation
d-v-b Feb 19, 2026
69172fb
fix countingdict tests
d-v-b Feb 19, 2026
28d0def
update design doc
d-v-b Feb 19, 2026
f8e39e6
dynamic pool allocation
d-v-b Feb 19, 2026
b388911
default to 1 itemsize for data types that don't declare it
d-v-b Feb 19, 2026
7e29ef3
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 19, 2026
00dde0b
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 19, 2026
9d77ca5
remove extra codec pipeline
d-v-b Feb 19, 2026
88a4875
remove garbage
d-v-b Feb 19, 2026
284e5e2
lint
d-v-b Feb 19, 2026
b1b876a
use protocols for new sync behavior
d-v-b Feb 20, 2026
6996284
remove batch size parameter; add changelog entry
d-v-b Feb 20, 2026
204dda1
prune dead code, make protocols useful
d-v-b Feb 20, 2026
e9db616
restore batch size but it's only there for warnings
d-v-b Feb 20, 2026
01e1f73
fix type hints, prevent thread pool leakage, make codec pipeline intr…
d-v-b Feb 20, 2026
fbde3af
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Feb 20, 2026
11534b0
restore old comments / docstrings
d-v-b Feb 20, 2026
b40d53a
simplify threadpool management
d-v-b Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions changes/3715.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Added several performance optimizations to chunk encoding and decoding. Low-latency stores that do not benefit from
`async` operations can now implement synchronous IO methods which will be used when available during chunk processing.
Similarly, codecs can implement a synchronous API which will be used if available during chunk processing.
These changes remove unnecessary interactions with the event loop.

The synchronous chunk processing path optionally uses a thread pool to parallelize codec work across chunks.
The pool is skipped for single-chunk operations and for pipelines that only contain cheap codecs (e.g. endian
swap, transpose, checksum).

Use of the thread pool can be disabled in the global configuration. The minimum number of threads
and the maximum number of threads can be set via the configuration as well.
69 changes: 68 additions & 1 deletion src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from abc import abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable

from typing_extensions import ReadOnly, TypedDict

Expand Down Expand Up @@ -32,6 +32,7 @@
"CodecInput",
"CodecOutput",
"CodecPipeline",
"SupportsSyncCodec",
]

CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
Expand Down Expand Up @@ -59,6 +60,19 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]:
"""The widest type of JSON-like input that could specify a codec."""


@runtime_checkable
class SupportsSyncCodec(Protocol):
"""Protocol for codecs that support synchronous encode/decode."""

def _decode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer: ...

def _encode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer | None: ...


class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
"""Generic base class for codecs.

Expand Down Expand Up @@ -459,6 +473,59 @@ async def write(
"""
...

# -------------------------------------------------------------------
# Fully synchronous read/write (opt-in)
#
# When a CodecPipeline subclass can run the entire read/write path
# (store IO + codec compute + buffer scatter) without touching the
# event loop, it overrides these methods and sets supports_sync_io
# to True. This lets Array selection methods bypass sync() entirely.
#
# The default implementations raise NotImplementedError.
# BatchedCodecPipeline overrides these when all codecs support sync.
# -------------------------------------------------------------------

@property
def supports_sync_io(self) -> bool:
"""Whether this pipeline can run read/write entirely on the calling thread.

True when:
- All codecs implement ``SupportsSyncCodec``
- The pipeline's read_sync/write_sync methods are implemented

Checked by ``Array._can_use_sync_path()`` to decide whether to bypass
the ``sync()`` event-loop bridge.
"""
return False

def read_sync(
self,
batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
out: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
"""Synchronous read: fetch bytes from store, decode, scatter into out.

Runs entirely on the calling thread. Only available when
``supports_sync_io`` is True. Called by ``_get_selection_sync`` in
``array.py`` when the sync bypass is active.
"""
raise NotImplementedError

def write_sync(
self,
batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
value: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
"""Synchronous write: gather from value, encode, persist to store.

Runs entirely on the calling thread. Only available when
``supports_sync_io`` is True. Called by ``_set_selection_sync`` in
``array.py`` when the sync bypass is active.
"""
raise NotImplementedError


async def _batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
Expand Down
27 changes: 26 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@

from zarr.core.buffer import Buffer, BufferPrototype

__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
__all__ = [
"ByteGetter",
"ByteSetter",
"Store",
"SyncByteGetter",
"SyncByteSetter",
"set_or_delete",
]


@dataclass
Expand Down Expand Up @@ -700,6 +707,24 @@ async def delete(self) -> None: ...
async def set_if_not_exists(self, default: Buffer) -> None: ...


@runtime_checkable
class SyncByteGetter(Protocol):
"""Protocol for stores that support synchronous byte reads."""

def get_sync(
self, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Buffer | None: ...


@runtime_checkable
class SyncByteSetter(SyncByteGetter, Protocol):
"""Protocol for stores that support synchronous byte reads, writes, and deletes."""

def set_sync(self, value: Buffer) -> None: ...

def delete_sync(self) -> None: ...


async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
"""Set or delete a value in a byte setter

Expand Down
9 changes: 7 additions & 2 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ async def open(
is_v3_array = zarr_format == 3 and _metadata_dict.get("node_type") == "array"
if is_v3_array or zarr_format == 2:
return AsyncArray(
store_path=store_path, metadata=_metadata_dict, config=kwargs.get("config")
store_path=store_path,
metadata=_metadata_dict,
config=kwargs.get("config"),
)
except (AssertionError, FileNotFoundError, NodeTypeValidationError):
pass
Expand Down Expand Up @@ -1279,7 +1281,10 @@ async def open_array(
_warn_write_empty_chunks_kwarg()

try:
return await AsyncArray.open(store_path, zarr_format=zarr_format)
return await AsyncArray.open(
store_path,
zarr_format=zarr_format,
)
except FileNotFoundError as err:
if not store_path.read_only and mode in _CREATE_MODES:
overwrite = _infer_overwrite(mode)
Expand Down
23 changes: 12 additions & 11 deletions src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,28 +299,29 @@ def _blosc_codec(self) -> Blosc:
config_dict["typesize"] = self.typesize
return Blosc.from_config(config_dict)

def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer:
return as_numpy_array_wrapper(self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype)

def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None:
# Since blosc only support host memory, we convert the input and output of the encoding
# between numpy array and buffer
return chunk_spec.prototype.buffer.from_bytes(
self._blosc_codec.encode(chunk_bytes.as_numpy_array())
)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype
)
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
# Since blosc only support host memory, we convert the input and output of the encoding
# between numpy array and buffer
return await asyncio.to_thread(
lambda chunk: chunk_spec.prototype.buffer.from_bytes(
self._blosc_codec.encode(chunk.as_numpy_array())
),
chunk_bytes,
)
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
18 changes: 16 additions & 2 deletions src/zarr/codecs/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
)
return self

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -88,7 +88,7 @@ async def _decode_single(
)
return chunk_array

async def _encode_single(
def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -109,5 +109,19 @@ async def _encode_single(
nd_array = nd_array.ravel().view(dtype="B")
return chunk_spec.prototype.buffer.from_array_like(nd_array)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return self._decode_sync(chunk_bytes, chunk_spec)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_array, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length
26 changes: 16 additions & 10 deletions src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
def to_dict(self) -> dict[str, JSON]:
return {"name": "crc32c"}

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer:
data = chunk_bytes.as_numpy_array()
crc32_bytes = data[-4:]
inner_bytes = data[:-4]
Expand All @@ -51,11 +47,7 @@ async def _decode_single(
)
return chunk_spec.prototype.buffer.from_array_like(inner_bytes)

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None:
data = chunk_bytes.as_numpy_array()
# Calculate the checksum and "cast" it to a numpy array
checksum = np.array(
Expand All @@ -64,5 +56,19 @@ async def _encode_single(
# Append the checksum (as bytes) to the data
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B")))

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return self._decode_sync(chunk_bytes, chunk_spec)

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_bytes, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length + 4
27 changes: 21 additions & 6 deletions src/zarr/codecs/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING

from numcodecs.gzip import GZip
Expand Down Expand Up @@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
def to_dict(self) -> dict[str, JSON]:
return {"name": "gzip", "configuration": {"level": self.level}}

# Cache the numcodecs GZip instance. GzipCodec is a frozen dataclass,
# so `level` never changes after construction, making this safe.
# This matches the pattern used by ZstdCodec._zstd_codec and
# BloscCodec._blosc_codec. Without caching, a new GZip(level) was
# created on every encode/decode call.
@cached_property
def _gzip_codec(self) -> GZip:
return GZip(self.level)

def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer:
# Use the cached codec instance instead of creating GZip(self.level)
# each time. The async _decode_single delegates to this method via
# asyncio.to_thread, so both paths benefit from the cache.
return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype)

def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None:
return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype
)
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as an aside, these to_thread calls are extremely annoying; they run on an independent thread pool not the one Zarr sets up (and are thus unconstrained by any config setting).

instead we need something like this:
https://github.com/earth-mover/xpublish-tiles/blob/1a800e05617d609098bbcd1a1f5ac9bbdcb531aa/src/xpublish_tiles/lib.py#L147-L152

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes to_thread has serious problems: python/cpython#136084. I will drop in your async_run idea!


async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(
as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype
)
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)

def compute_encoded_size(
self,
Expand Down
Loading