Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

156 changes: 149 additions & 7 deletions mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.modelcontextprotocol.client;

import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.experimental.tasks.TaskStore;
import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
Expand Down Expand Up @@ -184,6 +185,8 @@ class SyncSpec {

private final List<Consumer<McpSchema.ProgressNotification>> progressConsumers = new ArrayList<>();

private final List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers = new ArrayList<>();

private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;

private Function<ElicitRequest, ElicitResult> elicitationHandler;
Expand All @@ -194,6 +197,10 @@ class SyncSpec {

private boolean enableCallToolSchemaCaching = false; // Default to false

private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;

private Duration taskPollTimeout; // null = use default (5 minutes)

private SyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
Expand Down Expand Up @@ -317,6 +324,44 @@ public SyncSpec elicitation(Function<ElicitRequest, ElicitResult> elicitationHan
return this;
}

/**
* Sets the task store for client-side task hosting. When set, the client can host
* tasks for task-augmented sampling and elicitation requests from the server.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param taskStore The task store implementation. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStore is null
*/
public SyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
Assert.notNull(taskStore, "Task store must not be null");
this.taskStore = taskStore;
return this;
}

/**
* Sets the maximum time to wait for a task to reach a terminal state during task
* result polling.
*
* <p>
* When using task-augmented requests (e.g., long-running tool calls), the client
* polls the server for task status updates. This timeout limits how long the
* client will wait for the task to complete, fail, or be cancelled.
*
* <p>
* If not set, defaults to 5 minutes to prevent infinite polling loops.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param timeout maximum poll duration, or null to use the default (5 minutes)
* @return This builder instance for method chaining
*/
public SyncSpec taskPollTimeout(Duration timeout) {
this.taskPollTimeout = timeout;
return this;
}

/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
Expand Down Expand Up @@ -428,14 +473,42 @@ public SyncSpec progressConsumer(Consumer<McpSchema.ProgressNotification> progre
* @param progressConsumers A list of consumers that receives progress
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if progressConsumer is null
* @throws IllegalArgumentException if progressConsumers is null
*/
public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>> progressConsumers) {
Assert.notNull(progressConsumers, "Progress consumers must not be null");
this.progressConsumers.addAll(progressConsumers);
return this;
}

/**
* Adds a consumer to be notified of task status notifications from the server.
* This enables clients to receive updates about task progress and status changes.
* @param taskStatusConsumer A consumer that receives task status notifications.
* Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumer is null
*/
public SyncSpec taskStatusConsumer(Consumer<McpSchema.TaskStatusNotification> taskStatusConsumer) {
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
this.taskStatusConsumers.add(taskStatusConsumer);
return this;
}

/**
* Adds multiple consumers to be notified of task status notifications from the
* server.
* @param taskStatusConsumers A list of consumers that receive task status
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumers is null
*/
public SyncSpec taskStatusConsumers(List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers) {
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
this.taskStatusConsumers.addAll(taskStatusConsumers);
return this;
}

/**
* Add a provider of {@link McpTransportContext}, providing a context before
* calling any client operation. This allows to extract thread-locals and hand
Expand Down Expand Up @@ -486,14 +559,15 @@ public SyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching)
public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler,
this.elicitationHandler, this.enableCallToolSchemaCaching);
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
this.taskStatusConsumers, this.samplingHandler, this.elicitationHandler,
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null);

McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);

return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(),
asyncFeatures), this.contextProvider);
jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(), asyncFeatures,
this.taskStore), this.contextProvider);
}

}
Expand Down Expand Up @@ -540,6 +614,8 @@ class AsyncSpec {

private final List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();

private final List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers = new ArrayList<>();

private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;

private Function<ElicitRequest, Mono<ElicitResult>> elicitationHandler;
Expand All @@ -548,6 +624,10 @@ class AsyncSpec {

private boolean enableCallToolSchemaCaching = false; // Default to false

private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;

private Duration taskPollTimeout; // null = use default (5 minutes)

private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
Expand Down Expand Up @@ -671,6 +751,22 @@ public AsyncSpec elicitation(Function<ElicitRequest, Mono<ElicitResult>> elicita
return this;
}

/**
* Sets the task store for client-side task hosting. When set, the client can host
* tasks for task-augmented sampling and elicitation requests from the server.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param taskStore The task store implementation. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStore is null
*/
public AsyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
Assert.notNull(taskStore, "Task store must not be null");
this.taskStore = taskStore;
return this;
}

/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
Expand Down Expand Up @@ -785,7 +881,7 @@ public AsyncSpec progressConsumer(Function<McpSchema.ProgressNotification, Mono<
* @param progressConsumers A list of consumers that receives progress
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if progressConsumer is null
* @throws IllegalArgumentException if progressConsumers is null
*/
public AsyncSpec progressConsumers(
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers) {
Expand All @@ -794,6 +890,35 @@ public AsyncSpec progressConsumers(
return this;
}

/**
* Adds a consumer to be notified of task status notifications from the server.
* This enables clients to receive updates about task progress and status changes.
* @param taskStatusConsumer A consumer that receives task status notifications.
* Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumer is null
*/
public AsyncSpec taskStatusConsumer(Function<McpSchema.TaskStatusNotification, Mono<Void>> taskStatusConsumer) {
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
this.taskStatusConsumers.add(taskStatusConsumer);
return this;
}

/**
* Adds multiple consumers to be notified of task status notifications from the
* server.
* @param taskStatusConsumers A list of consumers that receive task status
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumers is null
*/
public AsyncSpec taskStatusConsumers(
List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers) {
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
this.taskStatusConsumers.addAll(taskStatusConsumers);
return this;
}

/**
* Sets the JSON schema validator to use for validating tool responses against
* output schemas.
Expand All @@ -819,6 +944,21 @@ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching
return this;
}

/**
* Sets the maximum duration to poll for task completion in
* {@code callToolStream()}. If not set, defaults to 5 minutes to prevent infinite
* polling loops.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param timeout maximum poll duration, or null to use the default (5 minutes)
* @return This builder instance for method chaining
*/
public AsyncSpec taskPollTimeout(Duration timeout) {
this.taskPollTimeout = timeout;
return this;
}

/**
* Create an instance of {@link McpAsyncClient} with the provided configurations
* or sensible defaults.
Expand All @@ -832,7 +972,9 @@ public McpAsyncClient build() {
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
this.taskStatusConsumers, this.samplingHandler, this.elicitationHandler,
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null),
this.taskStore);
}

}
Expand Down
Loading