Skip to content
Open
5 changes: 5 additions & 0 deletions .changeset/fix-infinite-query-peek-ahead.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/react-db': patch
---

Fix `useLiveInfiniteQuery` peek-ahead detection for `hasNextPage`. The initial query now correctly requests `pageSize + 1` items to detect whether additional pages exist, matching the behavior of subsequent page loads.
11 changes: 11 additions & 0 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,24 @@ export class CollectionConfigBuilder<

// Always run the graph if subscribed (eager execution)
if (syncState.subscribedToAllCollections) {
let callbackCalled = false
while (syncState.graph.pendingWork()) {
syncState.graph.run()
// Flush accumulated changes after each graph step to commit them as one transaction.
// This ensures intermediate join states (like null on one side) don't cause
// duplicate key errors when the full join result arrives in the same step.
syncState.flushPendingChanges?.()
callback?.()
callbackCalled = true
}

// Call the callback at least once if it wasn't already called (no pending work).
// This is important for lazy loading scenarios where:
// 1. setWindow() increases the limit and needs to trigger loadMoreIfNeeded
// 2. An async loadSubset completes and we need to check if more data is needed
// Without this, the callback would never be called if the graph has no work.
if (!callbackCalled) {
callback?.()
}

// On the initial run, we may need to do an empty commit to ensure that
Expand Down
6 changes: 5 additions & 1 deletion packages/react-db/src/useLiveInfiniteQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,14 @@ export function useLiveInfiniteQuery<TContext extends Context>(

// Create a live query with initial limit and offset
// Either pass collection directly or wrap query function
// Use pageSize + 1 for peek-ahead detection (to know if there are more pages)
const queryResult = isCollection
? useLiveQuery(queryFnOrCollection)
: useLiveQuery(
(q) => queryFnOrCollection(q).limit(pageSize).offset(0),
(q) =>
queryFnOrCollection(q)
.limit(pageSize + 1)
.offset(0),
deps,
)

Expand Down
325 changes: 323 additions & 2 deletions packages/react-db/tests/useLiveInfiniteQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ describe(`useLiveInfiniteQuery`, () => {
{
pageSize: 10,
initialPageParam: 0,
getNextPageParam: (lastPage, allPages, lastPageParam) =>
getNextPageParam: (lastPage, _allPages, lastPageParam) =>
lastPage.length === 10 ? lastPageParam + 1 : undefined,
},
)
Expand Down Expand Up @@ -838,7 +838,7 @@ describe(`useLiveInfiniteQuery`, () => {
{
pageSize: 10,
initialPageParam: 100,
getNextPageParam: (lastPage, allPages, lastPageParam) =>
getNextPageParam: (lastPage, _allPages, lastPageParam) =>
lastPage.length === 10 ? lastPageParam + 1 : undefined,
},
)
Expand Down Expand Up @@ -987,6 +987,327 @@ describe(`useLiveInfiniteQuery`, () => {
expect(result.current.isFetchingNextPage).toBe(false)
})

it(`should request limit+1 (peek-ahead) from loadSubset for hasNextPage detection`, async () => {
// Verifies that useLiveInfiniteQuery requests pageSize+1 items from loadSubset
// to detect whether there are more pages available (peek-ahead strategy)
const PAGE_SIZE = 10
const allPosts = createMockPosts(PAGE_SIZE) // Exactly PAGE_SIZE posts

// Track all loadSubset calls to inspect the limit parameter
const loadSubsetCalls: Array<LoadSubsetOptions> = []

const collection = createCollection<Post>({
id: `peek-ahead-limit-test`,
getKey: (post: Post) => post.id,
syncMode: `on-demand`,
startSync: true,
sync: {
sync: ({ markReady, begin, write, commit }) => {
markReady()

return {
loadSubset: (opts: LoadSubsetOptions) => {
// Record the call for later inspection
loadSubsetCalls.push({ ...opts })

// Return posts based on limit
const postsToReturn = opts.limit
? allPosts.slice(0, opts.limit)
: allPosts

begin()
for (const post of postsToReturn) {
write({
type: `insert`,
value: post,
})
}
commit()

return true // Synchronous load
},
}
},
},
})

const { result } = renderHook(() => {
return useLiveInfiniteQuery(
(q) =>
q
.from({ posts: collection })
.orderBy(({ posts: p }) => p.createdAt, `desc`),
{
pageSize: PAGE_SIZE,
getNextPageParam: (lastPage) =>
lastPage.length === PAGE_SIZE ? lastPage.length : undefined,
},
)
})

await waitFor(() => {
expect(result.current.isReady).toBe(true)
})

// Find the loadSubset call with a limit (the initial page load)
const callWithLimit = loadSubsetCalls.find(
(call) => call.limit !== undefined,
)
expect(callWithLimit).toBeDefined()
expect(callWithLimit!.limit).toBe(PAGE_SIZE + 1)

// With exactly PAGE_SIZE posts, hasNextPage should be false (no peek-ahead item returned)
expect(result.current.hasNextPage).toBe(false)
expect(result.current.data).toHaveLength(PAGE_SIZE)
})

it(`should work with on-demand collection and fetch multiple pages`, async () => {
// This test verifies end-to-end behavior of useLiveInfiniteQuery with an
// on-demand collection where ALL data comes from loadSubset (no initial data).
// This simulates the real Electric on-demand scenario.
const PAGE_SIZE = 10
const allPosts = createMockPosts(25) // 25 posts = 2 full pages + 5 items

// Track loadSubset calls
const loadSubsetCalls: Array<LoadSubsetOptions> = []

const collection = createCollection<Post>({
id: `on-demand-e2e-test`,
getKey: (post: Post) => post.id,
syncMode: `on-demand`,
startSync: true,
// Enable auto-indexing (critical for lazy loading to work)
autoIndex: `eager`,
sync: {
sync: ({ markReady, begin, write, commit }) => {
// NO initial data - collection starts empty
// This matches Electric on-demand behavior
markReady()

return {
loadSubset: (opts: LoadSubsetOptions) => {
loadSubsetCalls.push({ ...opts })

// Sort by createdAt descending (matching the query's orderBy)
let filtered = [...allPosts].sort(
(a, b) => b.createdAt - a.createdAt,
)

// Handle cursor-based pagination
if (opts.cursor) {
const { whereFrom } = opts.cursor
const whereFromFn =
createFilterFunctionFromExpression(whereFrom)
filtered = filtered.filter(whereFromFn)
}

// Apply limit
if (opts.limit !== undefined) {
filtered = filtered.slice(0, opts.limit)
}

begin()
for (const post of filtered) {
write({
type: `insert`,
value: post,
})
}
commit()

return true // Synchronous load
},
}
},
},
})

const { result } = renderHook(() => {
return useLiveInfiniteQuery(
(q) =>
q
.from({ posts: collection })
.orderBy(({ posts: p }) => p.createdAt, `desc`),
{
pageSize: PAGE_SIZE,
getNextPageParam: (lastPage) =>
lastPage.length === PAGE_SIZE ? lastPage.length : undefined,
},
)
})

await waitFor(() => {
expect(result.current.isReady).toBe(true)
})

// Page 1: 10 items, hasNextPage should be true (25 total items)
expect(result.current.pages).toHaveLength(1)
expect(result.current.data).toHaveLength(PAGE_SIZE)
expect(result.current.hasNextPage).toBe(true)

// Verify first page has correct items (posts 1-10, sorted by createdAt desc)
expect(result.current.data[0]!.id).toBe(`1`)
expect(result.current.data[9]!.id).toBe(`10`)

// Fetch page 2
act(() => {
result.current.fetchNextPage()
})

await waitFor(() => {
expect(result.current.pages).toHaveLength(2)
})

// Verify loadSubset was called again for page 2
expect(loadSubsetCalls.length).toBeGreaterThan(1)

// Page 2: should have 20 items total, hasNextPage should be true
expect(result.current.data).toHaveLength(20)
expect(result.current.hasNextPage).toBe(true)

// Verify second page has correct items (posts 11-20)
expect(result.current.pages[1]![0]!.id).toBe(`11`)
expect(result.current.pages[1]![9]!.id).toBe(`20`)

// Fetch page 3 (partial page)
act(() => {
result.current.fetchNextPage()
})

await waitFor(() => {
expect(result.current.pages).toHaveLength(3)
})

// Page 3: should have 25 items total (5 on last page), hasNextPage should be false
expect(result.current.data).toHaveLength(25)
expect(result.current.pages[2]).toHaveLength(5)
expect(result.current.hasNextPage).toBe(false)

// Verify third page has correct items (posts 21-25)
expect(result.current.pages[2]![0]!.id).toBe(`21`)
expect(result.current.pages[2]![4]!.id).toBe(`25`)
})

it(`should work with on-demand collection with async loadSubset`, async () => {
// This test mimics the real Electric on-demand scenario more closely:
// - Collection starts completely empty (no initial data)
// - ALL data comes from loadSubset which is ASYNC
// - Tests that subsequent pages are fetched correctly
const PAGE_SIZE = 10
const allPosts = createMockPosts(25)

// Track loadSubset calls
const loadSubsetCalls: Array<LoadSubsetOptions> = []

const collection = createCollection<Post>({
id: `on-demand-async-test`,
getKey: (post: Post) => post.id,
syncMode: `on-demand`,
startSync: true,
autoIndex: `eager`,
sync: {
sync: ({ markReady, begin, write, commit }) => {
// Collection starts empty - matches Electric on-demand
markReady()

return {
loadSubset: (opts: LoadSubsetOptions) => {
loadSubsetCalls.push({ ...opts })

// Sort by createdAt descending
let filtered = [...allPosts].sort(
(a, b) => b.createdAt - a.createdAt,
)

// Handle cursor-based pagination
if (opts.cursor) {
const { whereFrom } = opts.cursor
const whereFromFn =
createFilterFunctionFromExpression(whereFrom)
filtered = filtered.filter(whereFromFn)
}

// Apply limit
if (opts.limit !== undefined) {
filtered = filtered.slice(0, opts.limit)
}

// Return a Promise to simulate async network request
return new Promise<void>((resolve) => {
setTimeout(() => {
begin()
for (const post of filtered) {
write({
type: `insert`,
value: post,
})
}
commit()
resolve()
}, 10)
})
},
}
},
},
})

const { result } = renderHook(() => {
return useLiveInfiniteQuery(
(q) =>
q
.from({ posts: collection })
.orderBy(({ posts: p }) => p.createdAt, `desc`),
{
pageSize: PAGE_SIZE,
getNextPageParam: (lastPage) =>
lastPage.length === PAGE_SIZE ? lastPage.length : undefined,
},
)
})

await waitFor(() => {
expect(result.current.isReady).toBe(true)
})

// Wait for initial data to load
await waitFor(() => {
expect(result.current.data).toHaveLength(PAGE_SIZE)
})

// Page 1 loaded
expect(result.current.pages).toHaveLength(1)
expect(result.current.hasNextPage).toBe(true)

const initialCallCount = loadSubsetCalls.length

// Fetch page 2
act(() => {
result.current.fetchNextPage()
})

// Should be fetching
expect(result.current.isFetchingNextPage).toBe(true)

// Wait for page 2 data to actually load (not just loadedPageCount to increment)
// The async loadSubset takes 10ms to resolve, so we need to wait for the data
await waitFor(
() => {
expect(result.current.data).toHaveLength(20)
},
{ timeout: 500 },
)

// Verify pages structure
expect(result.current.pages).toHaveLength(2)

// CRITICAL: Verify loadSubset was called again for page 2
expect(loadSubsetCalls.length).toBeGreaterThan(initialCallCount)

// Verify hasNextPage
expect(result.current.hasNextPage).toBe(true)
})

it(`should track isFetchingNextPage when async loading is triggered`, async () => {
// Define all data upfront
const allPosts = createMockPosts(30)
Expand Down
Loading