Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 27 additions & 45 deletions packages/static/src/rsc/defer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getPayloadIDFor } from "./rscModule";
export interface DeferEntry {
state: DeferEntryState;
name?: string;
drainPromise?: Promise<string>;
}

/**
Expand All @@ -23,6 +24,7 @@ export interface DeferOptions {

export interface LoadedDeferEntry extends DeferEntry {
state: Exclude<DeferEntryState, { state: "pending" }>;
drainPromise: Promise<string>;
}

type DeferEntryState =
Expand Down Expand Up @@ -77,29 +79,23 @@ export class DeferRegistry {
const stream = renderToReadableStream<ReactNode>(state.element);
const [stream1, stream2] = stream.tee();
entry.state = { state: "streaming", stream: stream1 };
(async () => {
const chunks: string[] = [];
const decoder = new TextDecoder();
for await (const chunk of stream2) {
chunks.push(decoder.decode(chunk, { stream: true }));
}
chunks.push(decoder.decode());
entry.state = {
state: "ready",
data: chunks.join(""),
};
})().catch((error) => {
entry.state = { state: "error", error };
});
const drainPromise = drainStream(stream2);
entry.drainPromise = drainPromise;
drainPromise.then(
(data) => {
entry.state = { state: "ready", data };
},
(error) => {
entry.state = { state: "error", error };
},
);
return entry as LoadedDeferEntry;
}
case "streaming":
case "ready":
case "error": {
case "error":
return entry as LoadedDeferEntry;
}
}
state satisfies never;
}

has(id: string): boolean {
Expand All @@ -113,11 +109,14 @@ export class DeferRegistry {
async *loadAll() {
const errors: unknown[] = [];

// Phase 1: Start all entries loading
const loadedEntries = Array.from(
this.#registry,
([id, entry]) => [id, this.#loadEntry(entry), entry.name] as const,
);
// Phase 1: Start all entries loading and collect drain promises.
// We use drain promises (which drain stream2 from tee) instead of
// draining stream1 directly, because stream1 may have been locked
// by createFromReadableStream during SSR.
const loadedEntries = Array.from(this.#registry, ([id, entry]) => {
const loaded = this.#loadEntry(entry);
return [id, loaded.drainPromise, entry.name] as const;
});

if (loadedEntries.length === 0) return;

Expand All @@ -134,29 +133,12 @@ export class DeferRegistry {
waiting?.();
};

// Phase 2: Start all operations (each pushes to queue when done)
for (const [id, loadedEntry, name] of loadedEntries) {
(async () => {
try {
switch (loadedEntry.state.state) {
case "streaming":
onComplete({
id,
data: await drainStream(loadedEntry.state.stream),
name,
});
break;
case "ready":
onComplete({ id, data: loadedEntry.state.data, name });
break;
case "error":
onComplete({ error: loadedEntry.state.error });
break;
}
} catch (error) {
onComplete({ error });
}
})();
// Phase 2: Await drain promises
for (const [id, drainPromise, name] of loadedEntries) {
drainPromise.then(
(data) => onComplete({ id, data, name }),
(error) => onComplete({ error }),
);
}

// Phase 3: Yield from queue as results arrive
Expand Down