diff --git a/packages/static/src/rsc/defer.tsx b/packages/static/src/rsc/defer.tsx index 139956d..2c0e48a 100644 --- a/packages/static/src/rsc/defer.tsx +++ b/packages/static/src/rsc/defer.tsx @@ -7,6 +7,7 @@ import { getPayloadIDFor } from "./rscModule"; export interface DeferEntry { state: DeferEntryState; name?: string; + drainPromise?: Promise; } /** @@ -23,6 +24,7 @@ export interface DeferOptions { export interface LoadedDeferEntry extends DeferEntry { state: Exclude; + drainPromise: Promise; } type DeferEntryState = @@ -77,29 +79,23 @@ export class DeferRegistry { const stream = renderToReadableStream(state.element); const [stream1, stream2] = stream.tee(); entry.state = { state: "streaming", stream: stream1 }; - (async () => { - const chunks: string[] = []; - const decoder = new TextDecoder(); - for await (const chunk of stream2) { - chunks.push(decoder.decode(chunk, { stream: true })); - } - chunks.push(decoder.decode()); - entry.state = { - state: "ready", - data: chunks.join(""), - }; - })().catch((error) => { - entry.state = { state: "error", error }; - }); + const drainPromise = drainStream(stream2); + entry.drainPromise = drainPromise; + drainPromise.then( + (data) => { + entry.state = { state: "ready", data }; + }, + (error) => { + entry.state = { state: "error", error }; + }, + ); return entry as LoadedDeferEntry; } case "streaming": case "ready": - case "error": { + case "error": return entry as LoadedDeferEntry; - } } - state satisfies never; } has(id: string): boolean { @@ -113,11 +109,14 @@ export class DeferRegistry { async *loadAll() { const errors: unknown[] = []; - // Phase 1: Start all entries loading - const loadedEntries = Array.from( - this.#registry, - ([id, entry]) => [id, this.#loadEntry(entry), entry.name] as const, - ); + // Phase 1: Start all entries loading and collect drain promises. + // We use drain promises (which drain stream2 from tee) instead of + // draining stream1 directly, because stream1 may have been locked + // by createFromReadableStream during SSR. + const loadedEntries = Array.from(this.#registry, ([id, entry]) => { + const loaded = this.#loadEntry(entry); + return [id, loaded.drainPromise, entry.name] as const; + }); if (loadedEntries.length === 0) return; @@ -134,29 +133,12 @@ export class DeferRegistry { waiting?.(); }; - // Phase 2: Start all operations (each pushes to queue when done) - for (const [id, loadedEntry, name] of loadedEntries) { - (async () => { - try { - switch (loadedEntry.state.state) { - case "streaming": - onComplete({ - id, - data: await drainStream(loadedEntry.state.stream), - name, - }); - break; - case "ready": - onComplete({ id, data: loadedEntry.state.data, name }); - break; - case "error": - onComplete({ error: loadedEntry.state.error }); - break; - } - } catch (error) { - onComplete({ error }); - } - })(); + // Phase 2: Await drain promises + for (const [id, drainPromise, name] of loadedEntries) { + drainPromise.then( + (data) => onComplete({ id, data, name }), + (error) => onComplete({ error }), + ); } // Phase 3: Yield from queue as results arrive