diff --git a/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java b/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java index 1df8b57f3..36245e277 100644 --- a/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java +++ b/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java @@ -15,6 +15,7 @@ import io.a2a.server.tasks.PushNotificationConfigStore; import io.a2a.spec.ListTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigResult; +import io.a2a.util.PageToken; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.TaskPushNotificationConfig; import java.util.stream.Collectors; @@ -79,18 +80,17 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf String taskId = params.id(); LOGGER.debug("Retrieving PushNotificationConfigs for Task '{}' with params: pageSize={}, pageToken={}", taskId, params.pageSize(), params.pageToken()); + + // Parse pageToken once at the beginning + PageToken pageToken = PageToken.fromString(params.pageToken()); + try { StringBuilder queryBuilder = new StringBuilder("SELECT c FROM JpaPushNotificationConfig c WHERE c.id.taskId = :taskId"); - if (params.pageToken() != null && !params.pageToken().isEmpty()) { - String[] tokenParts = params.pageToken().split(":", 2); - if (tokenParts.length == 2) { + if (pageToken != null) { // Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId) // All tasks have timestamps (TaskStatus canonical constructor ensures this) queryBuilder.append(" AND (COALESCE(c.createdAt, :nullSentinel) < :tokenTimestamp OR (COALESCE(c.createdAt, :nullSentinel) = :tokenTimestamp AND c.id.configId > :tokenId))"); - } else { - // Based on the comments in the test case, if the pageToken is invalid start from the beginning. - } } queryBuilder.append(" ORDER BY COALESCE(c.createdAt, :nullSentinel) DESC, c.id.configId ASC"); @@ -99,22 +99,9 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf query.setParameter("taskId", taskId); query.setParameter("nullSentinel", NULL_TIMESTAMP_SENTINEL); - if (params.pageToken() != null && !params.pageToken().isEmpty()) { - String[] tokenParts = params.pageToken().split(":", 2); - if (tokenParts.length == 2) { - try { - long timestampMillis = Long.parseLong(tokenParts[0]); - String tokenId = tokenParts[1]; - - Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis); - query.setParameter("tokenTimestamp", tokenTimestamp); - query.setParameter("tokenId", tokenId); - } catch (NumberFormatException e) { - // Malformed timestamp in pageToken - throw new io.a2a.spec.InvalidParamsError(null, - "Invalid pageToken format: timestamp must be numeric milliseconds", null); - } - } + if (pageToken != null) { + query.setParameter("tokenTimestamp", pageToken.timestamp()); + query.setParameter("tokenId", pageToken.id()); } int pageSize = params.getEffectivePageSize(); @@ -128,7 +115,7 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf jpaConfigsPage = jpaConfigsPage.subList(0, pageSize); JpaPushNotificationConfig lastConfig = jpaConfigsPage.get(jpaConfigsPage.size() - 1); Instant timestamp = lastConfig.getCreatedAt() != null ? lastConfig.getCreatedAt() : NULL_TIMESTAMP_SENTINEL; - nextPageToken = timestamp.toEpochMilli() + ":" + lastConfig.getId().getConfigId(); + nextPageToken = new PageToken(timestamp, lastConfig.getId().getConfigId()).toString(); } List configs = jpaConfigsPage.stream() diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java index a31beb631..cca31e536 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java @@ -352,15 +352,12 @@ public void testPaginationWithInvalidToken() { // Create 5 configs createSamples(taskId, 5); - // Request with invalid pageToken - JPA implementation behavior is to start from beginning + // Request with invalid pageToken - should throw InvalidParamsError for invalid format ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams( taskId, 2, "invalid_token_that_does_not_exist", ""); - ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); - assertNotNull(result); - // When token is not found, implementation starts from beginning - assertEquals(2, result.configs().size(), "Should return first page when token is not found"); - assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist"); + assertThrows(io.a2a.spec.InvalidParamsError.class, () -> pushNotificationConfigStore.getInfo(params), + "Should throw InvalidParamsError for invalid pageToken format (missing colon)"); } @Test @@ -428,12 +425,9 @@ public void testPageTokenWithMissingColon() { ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 2, "123456789cfg1", ""); - ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); - assertNotNull(result); - assertEquals(2, result.configs().size(), - "Should return first page when pageToken format is invalid (missing colon)"); - assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist"); + assertThrows(io.a2a.spec.InvalidParamsError.class, () -> pushNotificationConfigStore.getInfo(params), + "Should throw InvalidParamsError for invalid pageToken format (missing colon)"); } @Test diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java index 840773ea6..e17c9b3c9 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java @@ -25,6 +25,7 @@ import io.a2a.spec.Artifact; import io.a2a.spec.ListTasksParams; import io.a2a.spec.Message; +import io.a2a.util.PageToken; import io.a2a.spec.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -227,23 +228,9 @@ public ListTasksResult list(ListTasksParams params) { params.contextId(), params.status(), params.pageSize(), params.pageToken()); // Parse pageToken once at the beginning - Instant tokenTimestamp = null; - String tokenId = null; - if (params.pageToken() != null && !params.pageToken().isEmpty()) { - String[] tokenParts = params.pageToken().split(":", 2); - if (tokenParts.length == 2) { - try { - long timestampMillis = Long.parseLong(tokenParts[0]); - tokenId = tokenParts[1]; - tokenTimestamp = Instant.ofEpochMilli(timestampMillis); - } catch (NumberFormatException e) { - throw new io.a2a.spec.InvalidParamsError(null, - "Invalid pageToken format: timestamp must be numeric milliseconds", null); - } - } else { - throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null); - } - } + PageToken pageToken = PageToken.fromString(params.pageToken()); + Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null; + String tokenId = pageToken != null ? pageToken.id() : null; // Build dynamic JPQL query with WHERE clauses for filtering StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1"); @@ -337,8 +324,8 @@ public ListTasksResult list(ListTasksParams params) { if (hasMore && !tasks.isEmpty()) { Task lastTask = tasks.get(tasks.size() - 1); // All tasks have timestamps (TaskStatus canonical constructor ensures this) - long timestampMillis = lastTask.status().timestamp().toInstant().toEpochMilli(); - nextPageToken = timestampMillis + ":" + lastTask.id(); + Instant timestamp = lastTask.status().timestamp().toInstant(); + nextPageToken = new PageToken(timestamp, lastTask.id()).toString(); } // Apply post-processing transformations (history limiting, artifact removal) diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index f1243e272..48034383f 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -11,6 +11,7 @@ import io.a2a.spec.Artifact; import io.a2a.spec.ListTasksParams; import io.a2a.spec.Message; +import io.a2a.util.PageToken; import io.a2a.spec.Task; import org.jspecify.annotations.Nullable; @@ -73,54 +74,41 @@ public ListTasksResult list(ListTasksParams params) { // Handle page token using keyset pagination (format: "timestamp_millis:taskId") // Use binary search to efficiently find the first task after the pageToken position (O(log N)) - if (params.pageToken() != null && !params.pageToken().isEmpty()) { - String[] tokenParts = params.pageToken().split(":", 2); - if (tokenParts.length == 2) { - try { - long tokenTimestampMillis = Long.parseLong(tokenParts[0]); - java.time.Instant tokenTimestamp = java.time.Instant.ofEpochMilli(tokenTimestampMillis); - String tokenId = tokenParts[1]; - - // Binary search for first task where: timestamp < tokenTimestamp OR (timestamp == tokenTimestamp AND id > tokenId) - // Since list is sorted (timestamp DESC, id ASC), we search for the insertion point - int left = 0; - int right = allFilteredTasks.size(); - - while (left < right) { - int mid = left + (right - left) / 2; - Task task = allFilteredTasks.get(mid); - - java.time.Instant taskTimestamp = (task.status() != null && task.status().timestamp() != null) - ? task.status().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS) - : null; - - if (taskTimestamp == null) { - // Task with null timestamp is always "before" a token with a timestamp, as they are sorted last. - // So, we search in the right half. - left = mid + 1; - } else { - int timestampCompare = taskTimestamp.compareTo(tokenTimestamp); - - if (timestampCompare < 0 || (timestampCompare == 0 && task.id().compareTo(tokenId) > 0)) { - // This task is after the token, search left half - right = mid; - } else { - // This task is before or equal to token, search right half - left = mid + 1; - } - } + PageToken pageToken = PageToken.fromString(params.pageToken()); + if (pageToken != null) { + java.time.Instant tokenTimestamp = pageToken.timestamp(); + String tokenId = pageToken.id(); + + // Binary search for first task where: timestamp < tokenTimestamp OR (timestamp == tokenTimestamp AND id > tokenId) + // Since list is sorted (timestamp DESC, id ASC), we search for the insertion point + int left = 0; + int right = allFilteredTasks.size(); + + while (left < right) { + int mid = left + (right - left) / 2; + Task task = allFilteredTasks.get(mid); + + java.time.Instant taskTimestamp = (task.status() != null && task.status().timestamp() != null) + ? task.status().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS) + : null; + + if (taskTimestamp == null) { + // Task with null timestamp is always "before" a token with a timestamp, as they are sorted last. + // So, we search in the right half. + left = mid + 1; + } else { + int timestampCompare = taskTimestamp.compareTo(tokenTimestamp); + + if (timestampCompare < 0 || (timestampCompare == 0 && task.id().compareTo(tokenId) > 0)) { + // This task is after the token, search left half + right = mid; + } else { + // This task is before or equal to token, search right half + left = mid + 1; } - startIndex = left; - } catch (NumberFormatException e) { - // Malformed timestamp in pageToken - throw new io.a2a.spec.InvalidParamsError(null, - "Invalid pageToken format: timestamp must be numeric milliseconds", null); } - } else { - // Legacy ID-only pageToken format is not supported with timestamp-based sorting - // Throw error to prevent incorrect pagination results - throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null); } + startIndex = left; } // Get the page of tasks @@ -132,8 +120,8 @@ public ListTasksResult list(ListTasksParams params) { if (endIndex < allFilteredTasks.size()) { Task lastTask = allFilteredTasks.get(endIndex - 1); // All tasks have timestamps (TaskStatus canonical constructor ensures this) - long timestampMillis = lastTask.status().timestamp().toInstant().toEpochMilli(); - nextPageToken = timestampMillis + ":" + lastTask.id(); + java.time.Instant timestamp = lastTask.status().timestamp().toInstant(); + nextPageToken = new PageToken(timestamp, lastTask.id()).toString(); } // Transform tasks: limit history and optionally remove artifacts diff --git a/spec/src/main/java/io/a2a/util/PageToken.java b/spec/src/main/java/io/a2a/util/PageToken.java new file mode 100644 index 000000000..c703a903a --- /dev/null +++ b/spec/src/main/java/io/a2a/util/PageToken.java @@ -0,0 +1,68 @@ +package io.a2a.util; + +import java.time.Instant; + +import io.a2a.spec.InvalidParamsError; +import org.jspecify.annotations.Nullable; + +/** + * Represents a pagination token for keyset-based pagination. + *

+ * PageTokens use the format {@code "timestamp_millis:id"} where: + *

+ * This format enables efficient keyset pagination by allowing queries to resume + * at a specific point in a timestamp-sorted, ID-secondary-sorted result set. + * + * @param timestamp The timestamp component of the page token + * @param id The identifier component of the page token + */ +public record PageToken(Instant timestamp, String id) { + + /** + * Parses a pageToken string into a PageToken record. + *

+ * Expected format: {@code "timestamp_millis:id"} + * + * @param tokenStr The pageToken string to parse, may be null or empty + * @return A PageToken instance, or null if tokenStr is null or empty + * @throws InvalidParamsError if the token format is invalid or timestamp is not numeric + */ + public static @Nullable PageToken fromString(@Nullable String tokenStr) { + if (tokenStr == null || tokenStr.isEmpty()) { + return null; + } + + String[] tokenParts = tokenStr.split(":", 2); + if (tokenParts.length != 2) { + throw new InvalidParamsError(null, + "Invalid pageToken format: expected 'timestamp:id'", null); + } + + try { + long timestampMillis = Long.parseLong(tokenParts[0]); + String id = tokenParts[1]; + if (id.isEmpty()) { + throw new InvalidParamsError(null, "Invalid pageToken format: id part cannot be empty", null); + } + return new PageToken(Instant.ofEpochMilli(timestampMillis), id); + } catch (NumberFormatException e) { + throw new InvalidParamsError(null, + "Invalid pageToken format: timestamp must be numeric milliseconds", null); + } + } + + /** + * Converts this PageToken to its string representation. + *

+ * Format: {@code "timestamp_millis:id"} + * + * @return The pageToken string + */ + @Override + public String toString() { + return timestamp.toEpochMilli() + ":" + id; + } +} diff --git a/spec/src/test/java/io/a2a/util/PageTokenTest.java b/spec/src/test/java/io/a2a/util/PageTokenTest.java new file mode 100644 index 000000000..aef9c8192 --- /dev/null +++ b/spec/src/test/java/io/a2a/util/PageTokenTest.java @@ -0,0 +1,161 @@ +package io.a2a.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.util.stream.Stream; + +import io.a2a.spec.InvalidParamsError; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullAndEmptySource; + +/** + * Unit tests for {@link PageToken}. + */ +class PageTokenTest { + + // ========== Valid Token Parsing Tests ========== + + /** + * Provides test data for valid token parsing tests. + * Format: tokenString, expectedTimestampMillis, expectedId + */ + static Stream validTokens() { + return Stream.of( + Arguments.of("1640000000000:user123", 1640000000000L, "user123"), + Arguments.of("1640000000000:user:123:extra", 1640000000000L, "user:123:extra"), + Arguments.of("0:user123", 0L, "user123"), + Arguments.of("-1000:user123", -1000L, "user123"), + Arguments.of(Long.MAX_VALUE + ":user123", Long.MAX_VALUE, "user123"), + Arguments.of(Long.MIN_VALUE + ":user123", Long.MIN_VALUE, "user123"), + Arguments.of("1640000000000:" + "a".repeat(1000), 1640000000000L, "a".repeat(1000)), + Arguments.of("1640000000000:user-123_test@example.com", 1640000000000L, "user-123_test@example.com") + ); + } + + /** + * Verifies that various valid pageToken strings are correctly parsed into PageToken objects. + */ + @ParameterizedTest + @MethodSource("validTokens") + void testFromString_validTokens_parseCorrectly(String tokenStr, long expectedTimestampMillis, String expectedId) { + PageToken token = PageToken.fromString(tokenStr); + + assertNotNull(token); + assertEquals(Instant.ofEpochMilli(expectedTimestampMillis), token.timestamp()); + assertEquals(expectedId, token.id()); + } + + // ========== Null and Empty Input Tests ========== + + /** + * Verifies that null or empty strings return null instead of throwing an exception. + */ + @ParameterizedTest + @NullAndEmptySource + void testFromString_nullOrEmpty_returnsNull(String tokenStr) { + PageToken token = PageToken.fromString(tokenStr); + assertNull(token); + } + + // ========== Invalid Format Tests ========== + + /** + * Provides test data for invalid token format tests. + * Format: tokenString, expectedErrorMessage + */ + static Stream invalidTokenFormats() { + return Stream.of( + Arguments.of("1640000000000user123", "Invalid pageToken format: expected 'timestamp:id'"), + Arguments.of("1640000000000", "Invalid pageToken format: expected 'timestamp:id'"), + Arguments.of("1640000000000:", "Invalid pageToken format: id part cannot be empty"), + Arguments.of(":", "Invalid pageToken format: timestamp must be numeric milliseconds"), + Arguments.of("notanumber:user123", "Invalid pageToken format: timestamp must be numeric milliseconds"), + Arguments.of("1640000000.123:user123", "Invalid pageToken format: timestamp must be numeric milliseconds"), + Arguments.of("1640 000 000:user123", "Invalid pageToken format: timestamp must be numeric milliseconds") + ); + } + + /** + * Verifies that various invalid token formats throw InvalidParamsError with appropriate messages. + */ + @ParameterizedTest + @MethodSource("invalidTokenFormats") + void testFromString_invalidFormats_throwsInvalidParamsError(String tokenStr, String expectedErrorMessage) { + InvalidParamsError ex = assertThrows(InvalidParamsError.class, () -> { + PageToken.fromString(tokenStr); + }); + assertNotNull(ex.getMessage()); + assertEquals(expectedErrorMessage, ex.getMessage()); + } + + // ========== toString Tests ========== + + /** + * Provides test data for toString tests. + * Format: timestampMillis, id, expectedString + */ + static Stream toStringTestData() { + return Stream.of( + Arguments.of(1640000000000L, "user123", "1640000000000:user123"), + Arguments.of(1640000000000L, "user:123", "1640000000000:user:123"), + Arguments.of(0L, "user123", "0:user123"), + Arguments.of(-1000L, "user123", "-1000:user123") + ); + } + + /** + * Verifies that toString formats PageTokens correctly as "timestamp_millis:id". + */ + @ParameterizedTest + @MethodSource("toStringTestData") + void testToString_formatsCorrectly(long timestampMillis, String id, String expectedString) { + PageToken token = new PageToken(Instant.ofEpochMilli(timestampMillis), id); + String result = token.toString(); + assertEquals(expectedString, result); + } + + // ========== Round-Trip Conversion Tests ========== + + /** + * Provides test data for round-trip conversion tests. + */ + static Stream roundTripTokens() { + return Stream.of( + "1640000000000:user123", + "1640000000000:user:123:extra", + "0:user123", + "-1000:user123" + ); + } + + /** + * Verifies that converting from string to PageToken and back to string preserves the original value. + */ + @ParameterizedTest + @MethodSource("roundTripTokens") + void testRoundTrip_fromStringToString_preservesValue(String original) { + PageToken token = PageToken.fromString(original); + String result = token.toString(); + assertEquals(original, result); + } + + /** + * Verifies that converting from PageToken to string and back to PageToken preserves the original value. + */ + @Test + void testRoundTrip_toStringFromString_preservesValue() { + PageToken original = new PageToken(Instant.ofEpochMilli(1640000000000L), "user123"); + String tokenStr = original.toString(); + PageToken result = PageToken.fromString(tokenStr); + + assertEquals(original.timestamp(), result.timestamp()); + assertEquals(original.id(), result.id()); + } +}