Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.spec.ListTaskPushNotificationConfigParams;
import io.a2a.spec.ListTaskPushNotificationConfigResult;
import io.a2a.util.PageToken;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.TaskPushNotificationConfig;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,18 +80,17 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf
String taskId = params.id();
LOGGER.debug("Retrieving PushNotificationConfigs for Task '{}' with params: pageSize={}, pageToken={}",
taskId, params.pageSize(), params.pageToken());

// Parse pageToken once at the beginning
PageToken pageToken = PageToken.fromString(params.pageToken());

try {
StringBuilder queryBuilder = new StringBuilder("SELECT c FROM JpaPushNotificationConfig c WHERE c.id.taskId = :taskId");

if (params.pageToken() != null && !params.pageToken().isEmpty()) {
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
if (pageToken != null) {
// Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
queryBuilder.append(" AND (COALESCE(c.createdAt, :nullSentinel) < :tokenTimestamp OR (COALESCE(c.createdAt, :nullSentinel) = :tokenTimestamp AND c.id.configId > :tokenId))");
} else {
// Based on the comments in the test case, if the pageToken is invalid start from the beginning.
}
}

queryBuilder.append(" ORDER BY COALESCE(c.createdAt, :nullSentinel) DESC, c.id.configId ASC");
Expand All @@ -99,22 +99,9 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf
query.setParameter("taskId", taskId);
query.setParameter("nullSentinel", NULL_TIMESTAMP_SENTINEL);

if (params.pageToken() != null && !params.pageToken().isEmpty()) {
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
try {
long timestampMillis = Long.parseLong(tokenParts[0]);
String tokenId = tokenParts[1];

Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis);
query.setParameter("tokenTimestamp", tokenTimestamp);
query.setParameter("tokenId", tokenId);
} catch (NumberFormatException e) {
// Malformed timestamp in pageToken
throw new io.a2a.spec.InvalidParamsError(null,
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
}
}
if (pageToken != null) {
query.setParameter("tokenTimestamp", pageToken.timestamp());
query.setParameter("tokenId", pageToken.id());
}

int pageSize = params.getEffectivePageSize();
Expand All @@ -128,7 +115,7 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf
jpaConfigsPage = jpaConfigsPage.subList(0, pageSize);
JpaPushNotificationConfig lastConfig = jpaConfigsPage.get(jpaConfigsPage.size() - 1);
Instant timestamp = lastConfig.getCreatedAt() != null ? lastConfig.getCreatedAt() : NULL_TIMESTAMP_SENTINEL;
nextPageToken = timestamp.toEpochMilli() + ":" + lastConfig.getId().getConfigId();
nextPageToken = new PageToken(timestamp, lastConfig.getId().getConfigId()).toString();
}

List<PushNotificationConfig> configs = jpaConfigsPage.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,12 @@ public void testPaginationWithInvalidToken() {
// Create 5 configs
createSamples(taskId, 5);

// Request with invalid pageToken - JPA implementation behavior is to start from beginning
// Request with invalid pageToken - should throw InvalidParamsError for invalid format
ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(
taskId, 2, "invalid_token_that_does_not_exist", "");
ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params);

assertNotNull(result);
// When token is not found, implementation starts from beginning
assertEquals(2, result.configs().size(), "Should return first page when token is not found");
assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist");
assertThrows(io.a2a.spec.InvalidParamsError.class, () -> pushNotificationConfigStore.getInfo(params),
"Should throw InvalidParamsError for invalid pageToken format (missing colon)");
}

@Test
Expand Down Expand Up @@ -428,12 +425,9 @@ public void testPageTokenWithMissingColon() {

ListTaskPushNotificationConfigParams params =
new ListTaskPushNotificationConfigParams(taskId, 2, "123456789cfg1", "");
ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params);

assertNotNull(result);
assertEquals(2, result.configs().size(),
"Should return first page when pageToken format is invalid (missing colon)");
assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist");
assertThrows(io.a2a.spec.InvalidParamsError.class, () -> pushNotificationConfigStore.getInfo(params),
"Should throw InvalidParamsError for invalid pageToken format (missing colon)");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.a2a.spec.Artifact;
import io.a2a.spec.ListTasksParams;
import io.a2a.spec.Message;
import io.a2a.util.PageToken;
import io.a2a.spec.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -227,23 +228,9 @@ public ListTasksResult list(ListTasksParams params) {
params.contextId(), params.status(), params.pageSize(), params.pageToken());

// Parse pageToken once at the beginning
Instant tokenTimestamp = null;
String tokenId = null;
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
try {
long timestampMillis = Long.parseLong(tokenParts[0]);
tokenId = tokenParts[1];
tokenTimestamp = Instant.ofEpochMilli(timestampMillis);
} catch (NumberFormatException e) {
throw new io.a2a.spec.InvalidParamsError(null,
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
}
} else {
throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null);
}
}
PageToken pageToken = PageToken.fromString(params.pageToken());
Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null;
String tokenId = pageToken != null ? pageToken.id() : null;

// Build dynamic JPQL query with WHERE clauses for filtering
StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1");
Expand Down Expand Up @@ -337,8 +324,8 @@ public ListTasksResult list(ListTasksParams params) {
if (hasMore && !tasks.isEmpty()) {
Task lastTask = tasks.get(tasks.size() - 1);
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
long timestampMillis = lastTask.status().timestamp().toInstant().toEpochMilli();
nextPageToken = timestampMillis + ":" + lastTask.id();
Instant timestamp = lastTask.status().timestamp().toInstant();
nextPageToken = new PageToken(timestamp, lastTask.id()).toString();
}

// Apply post-processing transformations (history limiting, artifact removal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.a2a.spec.Artifact;
import io.a2a.spec.ListTasksParams;
import io.a2a.spec.Message;
import io.a2a.util.PageToken;
import io.a2a.spec.Task;
import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -73,54 +74,41 @@ public ListTasksResult list(ListTasksParams params) {

// Handle page token using keyset pagination (format: "timestamp_millis:taskId")
// Use binary search to efficiently find the first task after the pageToken position (O(log N))
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
try {
long tokenTimestampMillis = Long.parseLong(tokenParts[0]);
java.time.Instant tokenTimestamp = java.time.Instant.ofEpochMilli(tokenTimestampMillis);
String tokenId = tokenParts[1];

// Binary search for first task where: timestamp < tokenTimestamp OR (timestamp == tokenTimestamp AND id > tokenId)
// Since list is sorted (timestamp DESC, id ASC), we search for the insertion point
int left = 0;
int right = allFilteredTasks.size();

while (left < right) {
int mid = left + (right - left) / 2;
Task task = allFilteredTasks.get(mid);

java.time.Instant taskTimestamp = (task.status() != null && task.status().timestamp() != null)
? task.status().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS)
: null;

if (taskTimestamp == null) {
// Task with null timestamp is always "before" a token with a timestamp, as they are sorted last.
// So, we search in the right half.
left = mid + 1;
} else {
int timestampCompare = taskTimestamp.compareTo(tokenTimestamp);

if (timestampCompare < 0 || (timestampCompare == 0 && task.id().compareTo(tokenId) > 0)) {
// This task is after the token, search left half
right = mid;
} else {
// This task is before or equal to token, search right half
left = mid + 1;
}
}
PageToken pageToken = PageToken.fromString(params.pageToken());
if (pageToken != null) {
java.time.Instant tokenTimestamp = pageToken.timestamp();
String tokenId = pageToken.id();

// Binary search for first task where: timestamp < tokenTimestamp OR (timestamp == tokenTimestamp AND id > tokenId)
// Since list is sorted (timestamp DESC, id ASC), we search for the insertion point
int left = 0;
int right = allFilteredTasks.size();

while (left < right) {
int mid = left + (right - left) / 2;
Task task = allFilteredTasks.get(mid);

java.time.Instant taskTimestamp = (task.status() != null && task.status().timestamp() != null)
? task.status().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS)
: null;

if (taskTimestamp == null) {
// Task with null timestamp is always "before" a token with a timestamp, as they are sorted last.
// So, we search in the right half.
left = mid + 1;
} else {
int timestampCompare = taskTimestamp.compareTo(tokenTimestamp);

if (timestampCompare < 0 || (timestampCompare == 0 && task.id().compareTo(tokenId) > 0)) {
// This task is after the token, search left half
right = mid;
} else {
// This task is before or equal to token, search right half
left = mid + 1;
}
startIndex = left;
} catch (NumberFormatException e) {
// Malformed timestamp in pageToken
throw new io.a2a.spec.InvalidParamsError(null,
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
}
} else {
// Legacy ID-only pageToken format is not supported with timestamp-based sorting
// Throw error to prevent incorrect pagination results
throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null);
}
startIndex = left;
}

// Get the page of tasks
Expand All @@ -132,8 +120,8 @@ public ListTasksResult list(ListTasksParams params) {
if (endIndex < allFilteredTasks.size()) {
Task lastTask = allFilteredTasks.get(endIndex - 1);
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
long timestampMillis = lastTask.status().timestamp().toInstant().toEpochMilli();
nextPageToken = timestampMillis + ":" + lastTask.id();
java.time.Instant timestamp = lastTask.status().timestamp().toInstant();
nextPageToken = new PageToken(timestamp, lastTask.id()).toString();
}

// Transform tasks: limit history and optionally remove artifacts
Expand Down
68 changes: 68 additions & 0 deletions spec/src/main/java/io/a2a/util/PageToken.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.a2a.util;

import java.time.Instant;

import io.a2a.spec.InvalidParamsError;
import org.jspecify.annotations.Nullable;

/**
* Represents a pagination token for keyset-based pagination.
* <p>
* PageTokens use the format {@code "timestamp_millis:id"} where:
* <ul>
* <li>{@code timestamp_millis} - Unix timestamp in milliseconds (numeric)</li>
* <li>{@code id} - The entity identifier (String)</li>
* </ul>
* This format enables efficient keyset pagination by allowing queries to resume
* at a specific point in a timestamp-sorted, ID-secondary-sorted result set.
*
* @param timestamp The timestamp component of the page token
* @param id The identifier component of the page token
*/
public record PageToken(Instant timestamp, String id) {

/**
* Parses a pageToken string into a PageToken record.
* <p>
* Expected format: {@code "timestamp_millis:id"}
*
* @param tokenStr The pageToken string to parse, may be null or empty
* @return A PageToken instance, or null if tokenStr is null or empty
* @throws InvalidParamsError if the token format is invalid or timestamp is not numeric
*/
public static @Nullable PageToken fromString(@Nullable String tokenStr) {
if (tokenStr == null || tokenStr.isEmpty()) {
return null;
}

String[] tokenParts = tokenStr.split(":", 2);
if (tokenParts.length != 2) {
throw new InvalidParamsError(null,
"Invalid pageToken format: expected 'timestamp:id'", null);
}

try {
long timestampMillis = Long.parseLong(tokenParts[0]);
String id = tokenParts[1];
if (id.isEmpty()) {
throw new InvalidParamsError(null, "Invalid pageToken format: id part cannot be empty", null);
}
return new PageToken(Instant.ofEpochMilli(timestampMillis), id);
} catch (NumberFormatException e) {
throw new InvalidParamsError(null,
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
}
}

/**
* Converts this PageToken to its string representation.
* <p>
* Format: {@code "timestamp_millis:id"}
*
* @return The pageToken string
*/
@Override
public String toString() {
return timestamp.toEpochMilli() + ":" + id;
}
}
Loading