From 3d869a1ef21d7c15bb206996ae404656bf7bfbdb Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 10 Feb 2026 11:33:36 +0800 Subject: [PATCH 1/6] feat: add create_session and run_sse on reverse mcp app --- .../reverse_mcp/server_with_reverse_mcp.py | 162 +++++++++++++++++- 1 file changed, 159 insertions(+), 3 deletions(-) diff --git a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py index 442ce6bf..376540c5 100644 --- a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py @@ -15,9 +15,14 @@ import asyncio import json import uuid -from typing import TYPE_CHECKING - -from fastapi import FastAPI, Request, Response, WebSocket +from typing import TYPE_CHECKING, Any, Optional + +from fastapi import FastAPI, HTTPException, Request, Response, WebSocket +from fastapi.responses import StreamingResponse +from google.adk.artifacts import InMemoryArtifactService +from google.adk.cli.adk_web_server import RunAgentRequest +from google.adk.runners import Runner as GoogleRunner +from google.adk.sessions import InMemorySessionService, Session from google.adk.tools.mcp_tool.mcp_session_manager import ( StreamableHTTPConnectionParams, ) @@ -86,6 +91,7 @@ def __init__( agent: "Agent", host: str = "0.0.0.0", port: int = 8000, + short_term_memory: Optional[Any] = None, ): self.agent = agent @@ -93,6 +99,29 @@ def __init__( self.port = port self.app = FastAPI() + + # Session and artifact services for new endpoints + # Priority: 1. provided short_term_memory, 2. agent's short_term_memory, 3. create new + if short_term_memory is not None: + from google.adk.sessions.base_session_service import BaseSessionService + + if isinstance(short_term_memory, BaseSessionService): + self.session_service = short_term_memory + else: + self.session_service = short_term_memory.session_service + elif ( + hasattr(agent, "short_term_memory") and agent.short_term_memory is not None + ): + from google.adk.sessions.base_session_service import BaseSessionService + + if isinstance(agent.short_term_memory, BaseSessionService): + self.session_service = agent.short_term_memory + else: + self.session_service = agent.short_term_memory.session_service + else: + self.session_service = InMemorySessionService() + self.artifact_service = InMemoryArtifactService() + # build routes for self.app self.build() @@ -172,8 +201,135 @@ async def ws_endpoint(ws: WebSocket): raw = await ws.receive_text() await self.ws_session_mgr.handle_ws_message(client_id, raw) + # ========== New endpoints: create_session, create_session_with_id, run_sse ========== + # NOTE: These must be defined BEFORE the catch-all /{path:path} route + + class CreateSessionRequest(BaseModel): + state: Optional[dict[str, Any]] = None + session_id: Optional[str] = None + + @self.app.post( + "/apps/{app_name}/users/{user_id}/sessions", + response_model_exclude_none=True, + ) + async def create_session( + app_name: str, + user_id: str, + req: Optional[CreateSessionRequest] = None, + ) -> Session: + """Create a new session.""" + session_id = req.session_id if req and req.session_id else str(uuid.uuid4()) + session = Session( + app_name=app_name, + user_id=user_id, + id=session_id, + state=req.state if req and req.state else {}, + ) + await self.session_service.create_session( + app_name=app_name, + user_id=user_id, + session_id=session_id, + state=req.state if req and req.state else {}, + ) + logger.info( + f"Created session: {session_id} for user {user_id} in app {app_name}" + ) + return session + + @self.app.post( + "/apps/{app_name}/users/{user_id}/sessions/{session_id}", + response_model_exclude_none=True, + ) + async def create_session_with_id( + app_name: str, + user_id: str, + session_id: str, + state: Optional[dict[str, Any]] = None, + ) -> Session: + """Create a session with specific ID.""" + await self.session_service.create_session( + app_name=app_name, + user_id=user_id, + session_id=session_id, + state=state if state else {}, + ) + session = Session( + app_name=app_name, + user_id=user_id, + id=session_id, + state=state if state else {}, + ) + logger.info(f"Created session with ID: {session_id} for user {user_id}") + return session + + @self.app.post("/run_sse") + async def run_agent_sse(req: RunAgentRequest) -> StreamingResponse: + """Run agent with SSE streaming.""" + # Get session + session = await self.session_service.get_session( + app_name=req.app_name, + user_id=req.user_id, + session_id=req.session_id, + ) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + # Use the first connected websocket client, or create a new agent clone + websocket_id = None + if self.ws_agent_mgr: + websocket_id = list(self.ws_agent_mgr.keys())[0] + agent = self.ws_agent_mgr[websocket_id] + logger.debug(f"Using agent from websocket {websocket_id}") + else: + # No websocket connected, use original agent + agent = self.agent + logger.debug("No websocket connected, using original agent") + + # Mount MCPToolset if needed + if not agent.tools: + logger.debug("Mount fake MCPToolset to agent for SSE") + agent.tools.append( + MCPToolset( + connection_params=StreamableHTTPConnectionParams( + url=f"http://127.0.0.1:{self.port}/mcp", + headers={REVERSE_MCP_HEADER_KEY: websocket_id or "default"}, + ), + ) + ) + + # Create runner + runner = GoogleRunner( + agent=agent, + app_name=req.app_name, + session_service=self.session_service, + artifact_service=self.artifact_service, + ) + + async def event_generator(): + try: + async for event in runner.run_async( + user_id=req.user_id, + session_id=req.session_id, + new_message=req.new_message, + state_delta=req.state_delta, + ): + event_json = event.model_dump_json( + exclude_none=True, by_alias=True + ) + logger.debug(f"SSE event: {event_json}") + yield f"data: {event_json}\n\n" + except Exception as e: + logger.exception(f"Error in event_generator: {e}") + yield f"data: {json.dumps({'error': str(e)})}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + ) + # build the fake MPC server, # and intercept all requests to the client websocket client. + # NOTE: This catch-all route must be defined LAST @self.app.api_route("/{path:path}", methods=["GET", "POST"]) async def mcp_proxy(path: str, request: Request): client_id = request.headers.get(REVERSE_MCP_HEADER_KEY) From 4b9f2cf2290f9b70f57a2a4c3b2f3c68327ad020 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 10 Feb 2026 14:31:14 +0800 Subject: [PATCH 2/6] feat: reverse mcp with session_service_mgr --- .../reverse_mcp/server_with_reverse_mcp.py | 138 +++++++++++------- 1 file changed, 85 insertions(+), 53 deletions(-) diff --git a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py index 376540c5..af884ba2 100644 --- a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py @@ -91,7 +91,6 @@ def __init__( agent: "Agent", host: str = "0.0.0.0", port: int = 8000, - short_term_memory: Optional[Any] = None, ): self.agent = agent @@ -100,26 +99,6 @@ def __init__( self.app = FastAPI() - # Session and artifact services for new endpoints - # Priority: 1. provided short_term_memory, 2. agent's short_term_memory, 3. create new - if short_term_memory is not None: - from google.adk.sessions.base_session_service import BaseSessionService - - if isinstance(short_term_memory, BaseSessionService): - self.session_service = short_term_memory - else: - self.session_service = short_term_memory.session_service - elif ( - hasattr(agent, "short_term_memory") and agent.short_term_memory is not None - ): - from google.adk.sessions.base_session_service import BaseSessionService - - if isinstance(agent.short_term_memory, BaseSessionService): - self.session_service = agent.short_term_memory - else: - self.session_service = agent.short_term_memory.session_service - else: - self.session_service = InMemorySessionService() self.artifact_service = InMemoryArtifactService() # build routes for self.app @@ -127,6 +106,7 @@ def __init__( self.ws_session_mgr = WebsocketSessionManager() self.ws_agent_mgr: dict[str, "Agent"] = {} + self.ws_session_service_mgr: dict[str, "InMemorySessionService"] = {} def build(self): logger.info("Build routes for server with reverse mcp") @@ -141,6 +121,8 @@ class InvokeRequest(BaseModel): websocket_id: str + mcp_tool_filter: Optional[list[str]] = None + class InvokeResponse(BaseModel): """Response model for /invoke endpoint""" @@ -155,16 +137,32 @@ async def invoke(payload: InvokeRequest) -> InvokeResponse: agent = self.ws_agent_mgr[payload.websocket_id] - if not agent.tools: + mcp_toolset_url = f"http://127.0.0.1:{self.port}/mcp" + mcp_toolset_headers = {REVERSE_MCP_HEADER_KEY: payload.websocket_id} + + has_mcp_toolset = False + for tool in agent.tools: + if isinstance(tool, MCPToolset): + if hasattr(tool, "_connection_params"): + conn_params = tool._connection_params + if ( + hasattr(conn_params, "url") + and conn_params.url == mcp_toolset_url + and hasattr(conn_params, "headers") + and conn_params.headers == mcp_toolset_headers + ): + has_mcp_toolset = True + break + + if not has_mcp_toolset: logger.debug("Mount fake MCPToolset to agent") - - # we hard code the mcp url with `/mcp` to obey the mcp protocol agent.tools.append( MCPToolset( connection_params=StreamableHTTPConnectionParams( - url=f"http://127.0.0.1:{self.port}/mcp", - headers={REVERSE_MCP_HEADER_KEY: payload.websocket_id}, + url=mcp_toolset_url, + headers=mcp_toolset_headers, ), + tool_filter=payload.mcp_tool_filter, ) ) @@ -194,6 +192,9 @@ async def ws_endpoint(ws: WebSocket): logger.info(f"Fork agent for websocket {client_id}") self.ws_agent_mgr[client_id] = self.agent.clone() + logger.info(f"Create session service for websocket {client_id}") + self.ws_session_service_mgr[client_id] = InMemorySessionService() + await ws.accept() logger.info(f"Websocket {client_id} connected") @@ -201,12 +202,22 @@ async def ws_endpoint(ws: WebSocket): raw = await ws.receive_text() await self.ws_session_mgr.handle_ws_message(client_id, raw) - # ========== New endpoints: create_session, create_session_with_id, run_sse ========== - # NOTE: These must be defined BEFORE the catch-all /{path:path} route - class CreateSessionRequest(BaseModel): state: Optional[dict[str, Any]] = None session_id: Optional[str] = None + websocket_id: str + + class RunAgentRequestWithWsId(RunAgentRequest): + websocket_id: str + mcp_tool_filter: Optional[list[str]] = None + + def _get_session_service(websocket_id: str) -> InMemorySessionService: + """Get session service for the websocket client.""" + if websocket_id not in self.ws_session_service_mgr: + raise HTTPException( + status_code=404, detail=f"WebSocket client {websocket_id} not found" + ) + return self.ws_session_service_mgr[websocket_id] @self.app.post( "/apps/{app_name}/users/{user_id}/sessions", @@ -215,21 +226,22 @@ class CreateSessionRequest(BaseModel): async def create_session( app_name: str, user_id: str, - req: Optional[CreateSessionRequest] = None, + req: CreateSessionRequest, ) -> Session: """Create a new session.""" - session_id = req.session_id if req and req.session_id else str(uuid.uuid4()) + session_id = req.session_id if req.session_id else str(uuid.uuid4()) session = Session( app_name=app_name, user_id=user_id, id=session_id, - state=req.state if req and req.state else {}, + state=req.state if req.state else {}, ) - await self.session_service.create_session( + session_service = _get_session_service(req.websocket_id) + await session_service.create_session( app_name=app_name, user_id=user_id, session_id=session_id, - state=req.state if req and req.state else {}, + state=req.state if req.state else {}, ) logger.info( f"Created session: {session_id} for user {user_id} in app {app_name}" @@ -244,29 +256,32 @@ async def create_session_with_id( app_name: str, user_id: str, session_id: str, - state: Optional[dict[str, Any]] = None, + req: CreateSessionRequest, ) -> Session: """Create a session with specific ID.""" - await self.session_service.create_session( + session_service = _get_session_service(req.websocket_id) + await session_service.create_session( app_name=app_name, user_id=user_id, session_id=session_id, - state=state if state else {}, + state=req.state if req.state else {}, ) session = Session( app_name=app_name, user_id=user_id, id=session_id, - state=state if state else {}, + state=req.state if req.state else {}, ) logger.info(f"Created session with ID: {session_id} for user {user_id}") return session @self.app.post("/run_sse") - async def run_agent_sse(req: RunAgentRequest) -> StreamingResponse: + async def run_agent_sse(req: RunAgentRequestWithWsId) -> StreamingResponse: """Run agent with SSE streaming.""" + session_service = _get_session_service(req.websocket_id) + # Get session - session = await self.session_service.get_session( + session = await session_service.get_session( app_name=req.app_name, user_id=req.user_id, session_id=req.session_id, @@ -274,26 +289,43 @@ async def run_agent_sse(req: RunAgentRequest) -> StreamingResponse: if not session: raise HTTPException(status_code=404, detail="Session not found") - # Use the first connected websocket client, or create a new agent clone - websocket_id = None - if self.ws_agent_mgr: - websocket_id = list(self.ws_agent_mgr.keys())[0] - agent = self.ws_agent_mgr[websocket_id] - logger.debug(f"Using agent from websocket {websocket_id}") + # Get agent for this websocket + if req.websocket_id in self.ws_agent_mgr: + agent = self.ws_agent_mgr[req.websocket_id] + logger.debug(f"Using agent from websocket {req.websocket_id}") else: - # No websocket connected, use original agent - agent = self.agent - logger.debug("No websocket connected, using original agent") + raise HTTPException( + status_code=404, + detail=f"WebSocket client {req.websocket_id} not found", + ) # Mount MCPToolset if needed - if not agent.tools: + mcp_toolset_url = f"http://127.0.0.1:{self.port}/mcp" + mcp_toolset_headers = {REVERSE_MCP_HEADER_KEY: req.websocket_id} + + has_mcp_toolset = False + for tool in agent.tools: + if isinstance(tool, MCPToolset): + if hasattr(tool, "_connection_params"): + conn_params = tool._connection_params + if ( + hasattr(conn_params, "url") + and conn_params.url == mcp_toolset_url + and hasattr(conn_params, "headers") + and conn_params.headers == mcp_toolset_headers + ): + has_mcp_toolset = True + break + + if not has_mcp_toolset: logger.debug("Mount fake MCPToolset to agent for SSE") agent.tools.append( MCPToolset( connection_params=StreamableHTTPConnectionParams( - url=f"http://127.0.0.1:{self.port}/mcp", - headers={REVERSE_MCP_HEADER_KEY: websocket_id or "default"}, + url=mcp_toolset_url, + headers=mcp_toolset_headers, ), + tool_filter=req.mcp_tool_filter, ) ) @@ -301,7 +333,7 @@ async def run_agent_sse(req: RunAgentRequest) -> StreamingResponse: runner = GoogleRunner( agent=agent, app_name=req.app_name, - session_service=self.session_service, + session_service=session_service, artifact_service=self.artifact_service, ) From 3ab9b301cce185c10c8ae7ad442a25d5238bfece Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 10 Feb 2026 15:28:50 +0800 Subject: [PATCH 3/6] fix: filter_mcp_tools --- .../reverse_mcp/client_with_reverse_mcp.py | 12 ++- .../reverse_mcp/server_with_reverse_mcp.py | 88 ++++++------------- 2 files changed, 36 insertions(+), 64 deletions(-) diff --git a/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py index b6e97ccc..bce7e119 100644 --- a/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py @@ -23,14 +23,24 @@ class ClientWithReverseMCP: - def __init__(self, ws_url: str, mcp_server_url: str, client_id: str): + def __init__( + self, + ws_url: str, + mcp_server_url: str, + client_id: str, + mcp_tool_filter: list[str] | None = None, + ): """Start a client with reverse mcp, Args: ws_url: The url of the websocket server (cloud). Like example.com:8000 mcp_server_url: The url of the mcp server (local). + client_id: The client id for the websocket connection. + mcp_tool_filter: Optional list of tool names to filter. If None, all tools are available. """ self.ws_url = f"ws://{ws_url}/ws?id={client_id}" + if mcp_tool_filter: + self.ws_url += f"&mcp_tool_filter={','.join(mcp_tool_filter)}" self.mcp_server_url = mcp_server_url # set timeout for httpx client diff --git a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py index af884ba2..8dcfb913 100644 --- a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py @@ -121,8 +121,6 @@ class InvokeRequest(BaseModel): websocket_id: str - mcp_tool_filter: Optional[list[str]] = None - class InvokeResponse(BaseModel): """Response model for /invoke endpoint""" @@ -137,35 +135,6 @@ async def invoke(payload: InvokeRequest) -> InvokeResponse: agent = self.ws_agent_mgr[payload.websocket_id] - mcp_toolset_url = f"http://127.0.0.1:{self.port}/mcp" - mcp_toolset_headers = {REVERSE_MCP_HEADER_KEY: payload.websocket_id} - - has_mcp_toolset = False - for tool in agent.tools: - if isinstance(tool, MCPToolset): - if hasattr(tool, "_connection_params"): - conn_params = tool._connection_params - if ( - hasattr(conn_params, "url") - and conn_params.url == mcp_toolset_url - and hasattr(conn_params, "headers") - and conn_params.headers == mcp_toolset_headers - ): - has_mcp_toolset = True - break - - if not has_mcp_toolset: - logger.debug("Mount fake MCPToolset to agent") - agent.tools.append( - MCPToolset( - connection_params=StreamableHTTPConnectionParams( - url=mcp_toolset_url, - headers=mcp_toolset_headers, - ), - tool_filter=payload.mcp_tool_filter, - ) - ) - runner = Runner(app_name=payload.app_name, agent=agent) response = await runner.run( messages=[prompt], @@ -179,6 +148,7 @@ async def invoke(payload: InvokeRequest) -> InvokeResponse: @self.app.websocket("/ws") async def ws_endpoint(ws: WebSocket): client_id = ws.query_params.get("id") + if not client_id: await ws.close( code=400, @@ -186,11 +156,34 @@ async def ws_endpoint(ws: WebSocket): ) return + # Parse mcp_tool_filter from query params, comma-separated string + mcp_tool_filter_str = ws.query_params.get("mcp_tool_filter") + mcp_tool_filter = None + if mcp_tool_filter_str: + mcp_tool_filter = [ + t.strip() for t in mcp_tool_filter_str.split(",") if t.strip() + ] + logger.info(f"Register websocket {client_id} to session manager.") self.ws_session_mgr.connections[client_id] = ws logger.info(f"Fork agent for websocket {client_id}") - self.ws_agent_mgr[client_id] = self.agent.clone() + agent = self.agent.clone() + + # Mount MCPToolset when creating agent + mcp_toolset_url = f"http://127.0.0.1:{self.port}/mcp" + mcp_toolset_headers = {REVERSE_MCP_HEADER_KEY: client_id} + logger.debug(f"Mount MCPToolset to agent for websocket {client_id}") + agent.tools.append( + MCPToolset( + connection_params=StreamableHTTPConnectionParams( + url=mcp_toolset_url, + headers=mcp_toolset_headers, + ), + tool_filter=mcp_tool_filter, + ) + ) + self.ws_agent_mgr[client_id] = agent logger.info(f"Create session service for websocket {client_id}") self.ws_session_service_mgr[client_id] = InMemorySessionService() @@ -209,7 +202,6 @@ class CreateSessionRequest(BaseModel): class RunAgentRequestWithWsId(RunAgentRequest): websocket_id: str - mcp_tool_filter: Optional[list[str]] = None def _get_session_service(websocket_id: str) -> InMemorySessionService: """Get session service for the websocket client.""" @@ -299,36 +291,6 @@ async def run_agent_sse(req: RunAgentRequestWithWsId) -> StreamingResponse: detail=f"WebSocket client {req.websocket_id} not found", ) - # Mount MCPToolset if needed - mcp_toolset_url = f"http://127.0.0.1:{self.port}/mcp" - mcp_toolset_headers = {REVERSE_MCP_HEADER_KEY: req.websocket_id} - - has_mcp_toolset = False - for tool in agent.tools: - if isinstance(tool, MCPToolset): - if hasattr(tool, "_connection_params"): - conn_params = tool._connection_params - if ( - hasattr(conn_params, "url") - and conn_params.url == mcp_toolset_url - and hasattr(conn_params, "headers") - and conn_params.headers == mcp_toolset_headers - ): - has_mcp_toolset = True - break - - if not has_mcp_toolset: - logger.debug("Mount fake MCPToolset to agent for SSE") - agent.tools.append( - MCPToolset( - connection_params=StreamableHTTPConnectionParams( - url=mcp_toolset_url, - headers=mcp_toolset_headers, - ), - tool_filter=req.mcp_tool_filter, - ) - ) - # Create runner runner = GoogleRunner( agent=agent, From 640b4a31b26849dd5e16c88797f68de1ee8c6195 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 10 Feb 2026 15:34:23 +0800 Subject: [PATCH 4/6] chore: rename filter --- .../apps/reverse_mcp/client_with_reverse_mcp.py | 8 ++++---- .../apps/reverse_mcp/server_with_reverse_mcp.py | 14 ++++++-------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py index bce7e119..5d9a8b7c 100644 --- a/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/client_with_reverse_mcp.py @@ -28,7 +28,7 @@ def __init__( ws_url: str, mcp_server_url: str, client_id: str, - mcp_tool_filter: list[str] | None = None, + filters: list[str] | None = None, ): """Start a client with reverse mcp, @@ -36,11 +36,11 @@ def __init__( ws_url: The url of the websocket server (cloud). Like example.com:8000 mcp_server_url: The url of the mcp server (local). client_id: The client id for the websocket connection. - mcp_tool_filter: Optional list of tool names to filter. If None, all tools are available. + filters: Optional list of tool names to filter (whitelist). If None, all tools are available. """ self.ws_url = f"ws://{ws_url}/ws?id={client_id}" - if mcp_tool_filter: - self.ws_url += f"&mcp_tool_filter={','.join(mcp_tool_filter)}" + if filters: + self.ws_url += f"&filters={','.join(filters)}" self.mcp_server_url = mcp_server_url # set timeout for httpx client diff --git a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py index 8dcfb913..b680c1d0 100644 --- a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py @@ -156,13 +156,11 @@ async def ws_endpoint(ws: WebSocket): ) return - # Parse mcp_tool_filter from query params, comma-separated string - mcp_tool_filter_str = ws.query_params.get("mcp_tool_filter") - mcp_tool_filter = None - if mcp_tool_filter_str: - mcp_tool_filter = [ - t.strip() for t in mcp_tool_filter_str.split(",") if t.strip() - ] + # Parse filters from query params, comma-separated string + filters_str = ws.query_params.get("filters") + filters = None + if filters_str: + filters = [t.strip() for t in filters_str.split(",") if t.strip()] logger.info(f"Register websocket {client_id} to session manager.") self.ws_session_mgr.connections[client_id] = ws @@ -180,7 +178,7 @@ async def ws_endpoint(ws: WebSocket): url=mcp_toolset_url, headers=mcp_toolset_headers, ), - tool_filter=mcp_tool_filter, + tool_filter=filters, ) ) self.ws_agent_mgr[client_id] = agent From e37b2363cbda8d03af4ddf61b60699c753fd9634 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 11 Feb 2026 12:12:55 +0800 Subject: [PATCH 5/6] chore: support streaming mode --- .../reverse_mcp/server_with_reverse_mcp.py | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py index b680c1d0..04e6de82 100644 --- a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py @@ -19,14 +19,16 @@ from fastapi import FastAPI, HTTPException, Request, Response, WebSocket from fastapi.responses import StreamingResponse +from google.adk.agents.run_config import StreamingMode from google.adk.artifacts import InMemoryArtifactService from google.adk.cli.adk_web_server import RunAgentRequest -from google.adk.runners import Runner as GoogleRunner +from google.adk.runners import Runner as GoogleRunner, RunConfig from google.adk.sessions import InMemorySessionService, Session from google.adk.tools.mcp_tool.mcp_session_manager import ( StreamableHTTPConnectionParams, ) from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset +from google.adk.utils.context_utils import Aclosing from pydantic import BaseModel from veadk import Runner @@ -297,19 +299,44 @@ async def run_agent_sse(req: RunAgentRequestWithWsId) -> StreamingResponse: artifact_service=self.artifact_service, ) + # Determine streaming mode from request + stream_mode = StreamingMode.SSE if req.streaming else StreamingMode.NONE + async def event_generator(): try: - async for event in runner.run_async( - user_id=req.user_id, - session_id=req.session_id, - new_message=req.new_message, - state_delta=req.state_delta, - ): - event_json = event.model_dump_json( - exclude_none=True, by_alias=True + async with Aclosing( + runner.run_async( + user_id=req.user_id, + session_id=req.session_id, + new_message=req.new_message, + state_delta=req.state_delta, + run_config=RunConfig(streaming_mode=stream_mode), + invocation_id=req.invocation_id, ) - logger.debug(f"SSE event: {event_json}") - yield f"data: {event_json}\n\n" + ) as agen: + async for event in agen: + # ADK Web renders artifacts from `actions.artifactDelta` + # during part processing *and* during action processing + # 1) the original event with `artifactDelta` cleared (content) + # 2) a content-less "action-only" event carrying `artifactDelta` + events_to_stream = [event] + if ( + event.actions.artifact_delta + and event.content + and event.content.parts + ): + content_event = event.model_copy(deep=True) + content_event.actions.artifact_delta = {} + artifact_event = event.model_copy(deep=True) + artifact_event.content = None + events_to_stream = [content_event, artifact_event] + + for event_to_stream in events_to_stream: + sse_event = event_to_stream.model_dump_json( + exclude_none=True, by_alias=True + ) + logger.debug(f"SSE event: {sse_event}") + yield f"data: {sse_event}\n\n" except Exception as e: logger.exception(f"Error in event_generator: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" From 3864354f4e8bc659f0c57b448dcbfb6cb3c7d4de Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 11 Feb 2026 22:20:48 +0800 Subject: [PATCH 6/6] fix: filtered_headers of content-length --- .../reverse_mcp/server_with_reverse_mcp.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py index 04e6de82..defdb4ba 100644 --- a/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py +++ b/veadk/toolkits/apps/reverse_mcp/server_with_reverse_mcp.py @@ -377,10 +377,27 @@ async def mcp_proxy(path: str, request: Request): logger.debug(f"[Reverse mcp proxy] Response from local: {resp}") + # Filter hop-by-hop headers to avoid Content-Length mismatch + headers = resp["payload"]["headers"] + hop_by_hop_headers = { + "content-length", + "transfer-encoding", + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailers", + "upgrade", + } + filtered_headers = { + k: v for k, v in headers.items() if k.lower() not in hop_by_hop_headers + } + return Response( content=resp["payload"]["body"], # type: ignore status_code=resp["payload"]["status"], # type: ignore - headers=resp["payload"]["headers"], # type: ignore + headers=filtered_headers, # type: ignore ) def run(self):