-
-
Notifications
You must be signed in to change notification settings - Fork 383
sketch out sync codecs + threadpool #3715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f427898
dbdc3d4
65d1230
e24fe7e
f979eaa
a934899
b53ac3e
73ac845
aeecda8
69172fb
28d0def
f8e39e6
b388911
7e29ef3
00dde0b
9d77ca5
88a4875
284e5e2
b1b876a
6996284
204dda1
e9db616
01e1f73
fbde3af
11534b0
b40d53a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| Added several performance optimizations to chunk encoding and decoding. Low-latency stores that do not benefit from | ||
| `async` operations can now implement synchronous IO methods which will be used when available during chunk processing. | ||
| Similarly, codecs can implement a synchronous API which will be used if available during chunk processing. | ||
| These changes remove unnecessary interactions with the event loop. | ||
|
|
||
| The synchronous chunk processing path optionally uses a thread pool to parallelize codec work across chunks. | ||
| The pool is skipped for single-chunk operations and for pipelines that only contain cheap codecs (e.g. endian | ||
| swap, transpose, checksum). | ||
|
|
||
| Use of the thread pool can be disabled in the global configuration. The minimum number of threads | ||
| and the maximum number of threads can be set via the configuration as well. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| import asyncio | ||
| from dataclasses import dataclass | ||
| from functools import cached_property | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from numcodecs.gzip import GZip | ||
|
|
@@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: | |
| def to_dict(self) -> dict[str, JSON]: | ||
| return {"name": "gzip", "configuration": {"level": self.level}} | ||
|
|
||
| # Cache the numcodecs GZip instance. GzipCodec is a frozen dataclass, | ||
| # so `level` never changes after construction, making this safe. | ||
| # This matches the pattern used by ZstdCodec._zstd_codec and | ||
| # BloscCodec._blosc_codec. Without caching, a new GZip(level) was | ||
| # created on every encode/decode call. | ||
| @cached_property | ||
| def _gzip_codec(self) -> GZip: | ||
| return GZip(self.level) | ||
|
|
||
| def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer: | ||
| # Use the cached codec instance instead of creating GZip(self.level) | ||
| # each time. The async _decode_single delegates to this method via | ||
| # asyncio.to_thread, so both paths benefit from the cache. | ||
| return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype) | ||
|
|
||
| def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None: | ||
| return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype) | ||
|
|
||
| async def _decode_single( | ||
| self, | ||
| chunk_bytes: Buffer, | ||
| chunk_spec: ArraySpec, | ||
| ) -> Buffer: | ||
| return await asyncio.to_thread( | ||
| as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype | ||
| ) | ||
| return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as an aside, these instead we need something like this:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
|
|
||
| async def _encode_single( | ||
| self, | ||
| chunk_bytes: Buffer, | ||
| chunk_spec: ArraySpec, | ||
| ) -> Buffer | None: | ||
| return await asyncio.to_thread( | ||
| as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype | ||
| ) | ||
| return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) | ||
|
|
||
| def compute_encoded_size( | ||
| self, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.