sketch out sync codecs + threadpool#3715
sketch out sync codecs + threadpool#3715d-v-b wants to merge 26 commits intozarr-developers:mainfrom
Conversation
docs/design/sync-bypass.md
Outdated
| @@ -0,0 +1,228 @@ | |||
| # Design: Fully Synchronous Read/Write Bypass | |||
|
performance impact ranges from "good" to "amazing" so I think we want to learn from this PR. IMO this is NOT a merge candidate but rather should function as a proof-of-concept for what we can get if we rethink our current codec API. Some key points:
|
|
the current performance improvements are without any parallelism. I'm adding that now. |
|
the latest commit adds thread-based parallelism to the synchronous codec pipeline. we compute an estimated compute cost based on the chunk size, codecs, and operation (encode / code), and use that estimate to choose a parellelism strategy, ranging from no threads to full use of a thread pool. |
|
marking this as not a draft, because I think we should actually consider merging it. |
|
i added a changelog entry and made a breaking change: removal of the |
|
This is extremely hard to review at the moment. Can we look at a new PR with just one affected codec (Zstd?) please? |
|
the changes here aren't really made at the granularity of a single codec. We have new codec pipeline behavior, which requires new methods on stores AND codecs. When the codec pipeline identifies that all the codecs AND the store support the fast path, then it uses the fast path. So breaking that apart is difficult. |
| return await asyncio.to_thread( | ||
| as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype | ||
| ) | ||
| return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) |
There was a problem hiding this comment.
as an aside, these to_thread calls are extremely annoying; they run on an independent thread pool not the one Zarr sets up (and are thus unconstrained by any config setting).
instead we need something like this:
https://github.com/earth-mover/xpublish-tiles/blob/1a800e05617d609098bbcd1a1f5ac9bbdcb531aa/src/xpublish_tiles/lib.py#L147-L152
There was a problem hiding this comment.
yes to_thread has serious problems: python/cpython#136084. I will drop in your async_run idea!
src/zarr/core/codec_pipeline.py
Outdated
| _CODEC_DECODE_NS_PER_BYTE: dict[str, float] = { | ||
| # Near-zero cost — just reshaping/copying/checksumming | ||
| "BytesCodec": 0, | ||
| "Crc32cCodec": 0, | ||
| "TransposeCodec": 0, | ||
| "VLenUTF8Codec": 0, | ||
| "VLenBytesCodec": 0, | ||
| # Medium cost — fast C codecs, GIL released | ||
| "ZstdCodec": 1, | ||
| "BloscCodec": 0.5, | ||
| # High cost — slower C codecs, GIL released | ||
| "GzipCodec": 8, | ||
| } | ||
|
|
||
| _CODEC_ENCODE_NS_PER_BYTE: dict[str, float] = { | ||
| # Near-zero cost — just reshaping/copying/checksumming | ||
| "BytesCodec": 0, | ||
| "Crc32cCodec": 0, | ||
| "TransposeCodec": 0, | ||
| "VLenUTF8Codec": 0, | ||
| "VLenBytesCodec": 0, | ||
| # Medium cost — fast C codecs, GIL released | ||
| "ZstdCodec": 3, | ||
| "BloscCodec": 2, | ||
| # High cost — slower C codecs, GIL released | ||
| "GzipCodec": 50, | ||
| } |
There was a problem hiding this comment.
@dcherian here's the estimated cost of running each codec in the encode and decode path
…ospection more efficient
…into perf/faster-codecs
mkitti
left a comment
There was a problem hiding this comment.
Could we adjust work estimates based on codec parameters?
src/zarr/core/codec_pipeline.py
Outdated
| "VLenUTF8Codec": 0, | ||
| "VLenBytesCodec": 0, | ||
| # Medium cost — fast C codecs, GIL released | ||
| "ZstdCodec": 3, |
There was a problem hiding this comment.
Can we adjust by the compression level.? Compression level level -1000 is different compression level 22 in terms of time.
There was a problem hiding this comment.
yes we could put this in the model. we would have to take some data first of course
| if per_chunk_ns < _POOL_OVERHEAD_NS and min_workers == 0: | ||
| # Only use the pool when at least one codec does real work | ||
| # and the chunks are large enough to offset dispatch overhead. | ||
| has_expensive = any(type(c).__name__ not in _CHEAP_CODECS for c in codecs) |
There was a problem hiding this comment.
uhh... can we not isinstance because of cyclic imports or something?
| _MIN_CHUNK_NBYTES_FOR_POOL = 100_000 # 100 KB | ||
|
|
||
|
|
||
| def _choose_workers(n_chunks: int, chunk_nbytes: int, codecs: Iterable[Codec]) -> int: |
There was a problem hiding this comment.
Can this be def _use_thread_pool(...)->bool instead?
|
|
||
| def _get_pool(max_workers: int) -> ThreadPoolExecutor: | ||
| """Get a thread pool with at most *max_workers* threads.""" | ||
| def _get_pool() -> ThreadPoolExecutor: |
There was a problem hiding this comment.
hard to see why this had to change but... i"m not opposed to it.
| """Get the module-level thread pool, creating it lazily.""" | ||
| global _pool | ||
| if _pool is None: | ||
| max_workers: int = config.get("threading.codec_workers").get("max") or os.cpu_count() or 4 |
There was a problem hiding this comment.
duplicated in _choose_workers, doesn't donfig have a way to do runtime defaults?
This is a work in progress with all the heavy lifting done by claude. The goal is to improve the performance of our codecs by avoiding overhead in
to_threadand other async machinery. At the moment we have deadlocks in some of the array tests, but I am opening this now as a draft to see if the benchmarks show anything promising.