Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions go-opencode/internal/server/handlers_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ func (s *Server) sendMessage(w http.ResponseWriter, r *http.Request) {
return
}

// Create user message parts
// Create user message parts (SDK compatible: include sessionID and messageID)
textPart := &types.TextPart{
ID: generateID(),
Type: "text",
Text: content,
ID: generateID(),
SessionID: sessionID,
MessageID: userMsg.ID,
Type: "text",
Text: content,
}
userParts := []types.Part{textPart}

Expand All @@ -118,24 +120,44 @@ func (s *Server) sendMessage(w http.ResponseWriter, r *http.Request) {
return
}

// Add file parts if provided
for _, file := range req.Files {
file.ID = generateID()
file.Type = "file"
userParts = append(userParts, &file)
// Add file parts if provided (SDK compatible: include sessionID and messageID)
for i := range req.Files {
req.Files[i].ID = generateID()
req.Files[i].SessionID = sessionID
req.Files[i].MessageID = userMsg.ID
req.Files[i].Type = "file"
userParts = append(userParts, &req.Files[i])
// Save file part to storage
if err := s.sessionService.SavePart(r.Context(), userMsg.ID, &file); err != nil {
if err := s.sessionService.SavePart(r.Context(), userMsg.ID, &req.Files[i]); err != nil {
writeError(w, http.StatusInternalServerError, ErrCodeInternalError, err.Error())
return
}
}

// Publish user message via SSE (not in HTTP response)
// Publish user message via SSE (SDK compatible: uses message.updated)
event.Publish(event.Event{
Type: event.MessageUpdated,
Data: event.MessageUpdatedData{Info: userMsg},
})

// Publish user message parts (SDK compatible: uses message.part.updated)
event.Publish(event.Event{
Type: "message.created",
Data: event.MessageCreatedData{Info: userMsg},
Type: event.MessagePartUpdated,
Data: event.MessagePartUpdatedData{
Part: textPart,
},
})

// Publish file parts if any
for i := range req.Files {
event.Publish(event.Event{
Type: event.MessagePartUpdated,
Data: event.MessagePartUpdatedData{
Part: &req.Files[i],
},
})
}

// Process message and generate response
// This is where the LLM provider is called
// Updates are published via SSE, not streamed in HTTP response
Expand Down
29 changes: 29 additions & 0 deletions go-opencode/internal/session/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ func (p *Processor) runLoop(
messages, _ = p.loadMessages(ctx, sessionID)
}

// Reload messages to include the current assistant message and tool results
messages, err = p.loadMessages(ctx, sessionID)
if err != nil {
return fmt.Errorf("failed to reload messages: %w", err)
}
fmt.Printf("[loop] Reloaded %d messages for step %d\n", len(messages), step)

// Build completion request
req, err := p.buildCompletionRequest(ctx, sessionID, messages, assistantMsg, agent, model)
if err != nil {
Expand Down Expand Up @@ -310,10 +317,13 @@ func (p *Processor) runLoop(

case "tool_use", "tool_calls":
// Execute tools and continue loop
fmt.Printf("[loop] Got tool_use/tool_calls, calling executeToolCalls with %d parts\n", len(state.parts))
if err := p.executeToolCalls(ctx, state, agent, callback); err != nil {
fmt.Printf("[loop] executeToolCalls returned error: %v\n", err)
// Tool execution errors don't stop the loop
// The error is captured in the tool part
}
fmt.Printf("[loop] executeToolCalls completed, step=%d\n", step)
step++
continue

Expand Down Expand Up @@ -430,16 +440,29 @@ func (p *Processor) buildCompletionRequest(
Content: systemPrompt.Build(),
})

fmt.Printf("[build] Processing %d messages for completion request\n", len(messages))

// Add conversation history
for _, msg := range messages {
fmt.Printf("[build] Message: role=%s, id=%s\n", msg.Role, msg.ID)

// Skip errored messages without content
if msg.Error != nil && !p.hasUsableContent(ctx, msg) {
fmt.Printf("[build] Skipping errored message without content\n")
continue
}

// Load parts for this message
parts, err := p.loadParts(ctx, msg.ID)
if err != nil {
fmt.Printf("[build] Failed to load parts for message %s: %v\n", msg.ID, err)
continue
}
fmt.Printf("[build] Loaded %d parts for message %s\n", len(parts), msg.ID)

// Skip messages with no parts (e.g., newly created empty assistant messages)
if len(parts) == 0 {
fmt.Printf("[build] Skipping message %s with no parts\n", msg.ID)
continue
}

Expand All @@ -450,6 +473,9 @@ func (p *Processor) buildCompletionRequest(
if msg.Role == "assistant" {
for _, part := range parts {
if toolPart, ok := part.(*types.ToolPart); ok {
fmt.Printf("[build] Found ToolPart: tool=%s, status=%s, callID=%s, output=%q\n",
toolPart.Tool, toolPart.State.Status, toolPart.CallID, truncateStr(toolPart.State.Output, 100))

// Only completed or errored tool parts should be added as results
if toolPart.State.Status == "completed" || toolPart.State.Status == "error" {
var toolContent string
Expand All @@ -459,6 +485,9 @@ func (p *Processor) buildCompletionRequest(
toolContent = "Error: " + toolPart.State.Error
}

fmt.Printf("[build] Adding tool result message for callID=%s, content length=%d\n",
toolPart.CallID, len(toolContent))

toolMsg := &schema.Message{
Role: schema.Tool,
Content: toolContent,
Expand Down
75 changes: 61 additions & 14 deletions go-opencode/internal/session/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"strings"
"time"

"github.com/cloudwego/eino/schema"
Expand Down Expand Up @@ -86,14 +87,18 @@ func (p *Processor) processStream(
}

// Finalize tool parts
fmt.Printf("[stream] Finalizing %d tool parts\n", len(currentToolParts))
for id, toolPart := range currentToolParts {
fmt.Printf("[stream] Finalizing toolPart: id=%s, tool=%s, callID=%s, currentStatus=%s\n",
id, toolPart.Tool, toolPart.CallID, toolPart.State.Status)
if accInput, ok := accumulatedToolInputs[id]; ok && toolPart.State.Input == nil {
var input map[string]any
if err := json.Unmarshal([]byte(accInput), &input); err == nil {
toolPart.State.Input = input
}
}
toolPart.State.Status = "running"
fmt.Printf("[stream] Set toolPart status to 'running': tool=%s, ptr=%p\n", toolPart.Tool, toolPart)
p.savePart(ctx, state.message.ID, toolPart)
}

Expand Down Expand Up @@ -150,17 +155,28 @@ func (p *Processor) processMessageChunk(
}
state.parts = append(state.parts, *currentTextPart)
*accumulatedContent = msg.Content

// Publish delta event for FIRST chunk (SDK compatible)
// This ensures the TUI receives and displays the first text chunk
event.Publish(event.Event{
Type: event.MessagePartUpdated,
Data: event.MessagePartUpdatedData{
Part: *currentTextPart,
Delta: msg.Content, // First chunk IS the delta
},
})

callback(state.message, state.parts)
} else {
// Check if this is accumulated content (longer than previous) or delta content (shorter)
// Check if this is accumulated content (starts with previous) or delta content (new chunk only)
var delta string
if len(msg.Content) > len(*accumulatedContent) {
// Accumulated mode: extract delta from difference
if strings.HasPrefix(msg.Content, *accumulatedContent) {
// Accumulated mode: new content STARTS WITH all previous content
delta = msg.Content[len(*accumulatedContent):]
(*currentTextPart).Text = msg.Content
*accumulatedContent = msg.Content
} else {
// Delta mode: append delta directly
// Delta mode: new content is just the new part
delta = msg.Content
*accumulatedContent += msg.Content
(*currentTextPart).Text = *accumulatedContent
Expand Down Expand Up @@ -200,10 +216,34 @@ func (p *Processor) processMessageChunk(
}

// Handle tool calls
// The eino streaming model uses Index to track tool calls:
// - Start event: Index=N, ID="toolu_xxx", Name="Read"
// - Delta events: Index=N, ID="", Name="", Arguments='{"partial...'
for _, tc := range msg.ToolCalls {
toolPart, exists := currentToolParts[tc.ID]
if !exists {
// New tool call
// Use Index to track tool calls (eino streaming model)
var toolIndex int
if tc.Index != nil {
toolIndex = *tc.Index
} else if tc.ID != "" {
// Fallback: use ID-based tracking if Index not available
toolIndex = -1 // Will use ID map
} else {
fmt.Printf("[stream] Skipping tool call with no Index and no ID\n")
continue
}

// Determine lookup key - use index string or ID
var lookupKey string
if toolIndex >= 0 {
lookupKey = fmt.Sprintf("idx:%d", toolIndex)
} else {
lookupKey = tc.ID
}

toolPart, exists := currentToolParts[lookupKey]

// New tool call (has ID and Name)
if !exists && tc.ID != "" && tc.Function.Name != "" {
now := time.Now().UnixMilli()
toolPart = &types.ToolPart{
ID: generatePartID(),
Expand All @@ -219,19 +259,26 @@ func (p *Processor) processMessageChunk(
Time: &types.ToolTime{Start: now},
},
}
currentToolParts[tc.ID] = toolPart
accumulatedToolInputs[tc.ID] = ""
fmt.Printf("[stream] Created new ToolPart: tool=%s, callID=%s, index=%d\n", toolPart.Tool, toolPart.CallID, toolIndex)
currentToolParts[lookupKey] = toolPart
accumulatedToolInputs[lookupKey] = ""
state.parts = append(state.parts, toolPart)
fmt.Printf("[stream] Added toolPart to state.parts, total parts=%d\n", len(state.parts))
callback(state.message, state.parts)
}

// Accumulate arguments
if tc.Function.Arguments != "" {
accumulatedToolInputs[tc.ID] = tc.Function.Arguments
toolPart.State.Raw = tc.Function.Arguments
// Accumulate arguments (delta chunks have arguments but no ID/Name)
if tc.Function.Arguments != "" && toolPart != nil {
// Append arguments (eino sends deltas, not accumulated)
accumulatedToolInputs[lookupKey] += tc.Function.Arguments
toolPart.State.Raw = accumulatedToolInputs[lookupKey]
fmt.Printf("[stream] Tool %s accumulated args: %s\n", toolPart.Tool, truncate(accumulatedToolInputs[lookupKey], 100))

// Try to parse accumulated JSON
var input map[string]any
if err := json.Unmarshal([]byte(tc.Function.Arguments), &input); err == nil {
if err := json.Unmarshal([]byte(accumulatedToolInputs[lookupKey]), &input); err == nil {
toolPart.State.Input = input
fmt.Printf("[stream] Tool %s parsed input: %v\n", toolPart.Tool, input)
}

// Publish tool part update (SDK compatible: uses MessagePartUpdated)
Expand Down
30 changes: 30 additions & 0 deletions go-opencode/internal/session/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@ func (p *Processor) executeToolCalls(
agent *Agent,
callback ProcessCallback,
) error {
fmt.Printf("[tools] executeToolCalls called, state.parts=%d\n", len(state.parts))

// Log each part's type and status for debugging
for i, part := range state.parts {
switch tp := part.(type) {
case *types.ToolPart:
fmt.Printf("[tools] Part %d: ToolPart tool=%s status=%s callID=%s\n",
i, tp.Tool, tp.State.Status, tp.CallID)
case *types.TextPart:
fmt.Printf("[tools] Part %d: TextPart len=%d\n", i, len(tp.Text))
case *types.ReasoningPart:
fmt.Printf("[tools] Part %d: ReasoningPart len=%d\n", i, len(tp.Text))
default:
fmt.Printf("[tools] Part %d: %T\n", i, part)
}
}

// Find all running tool parts
var pendingTools []*types.ToolPart
for _, part := range state.parts {
Expand All @@ -29,13 +46,18 @@ func (p *Processor) executeToolCalls(
}
}

fmt.Printf("[tools] Found %d pending tools with status='running'\n", len(pendingTools))

// Execute each tool
for _, toolPart := range pendingTools {
fmt.Printf("[tools] About to execute tool: %s (callID=%s)\n", toolPart.Tool, toolPart.CallID)
err := p.executeSingleTool(ctx, state, agent, toolPart, callback)
if err != nil {
fmt.Printf("[tools] Tool execution error: %v\n", err)
// Error is captured in tool part, don't stop processing
continue
}
fmt.Printf("[tools] Tool execution completed: %s\n", toolPart.Tool)
}

return nil
Expand All @@ -49,12 +71,20 @@ func (p *Processor) executeSingleTool(
toolPart *types.ToolPart,
callback ProcessCallback,
) error {
fmt.Printf("[tools] executeSingleTool: tool=%s, callID=%s\n", toolPart.Tool, toolPart.CallID)

// Get the tool from registry
t, ok := p.toolRegistry.Get(toolPart.Tool)
if !ok {
fmt.Printf("[tools] Tool NOT FOUND in registry: %s\n", toolPart.Tool)
// List available tools for debugging
if p.toolRegistry != nil {
fmt.Printf("[tools] Available tools: %v\n", p.toolRegistry.List())
}
return p.failTool(ctx, state, toolPart, callback,
fmt.Sprintf("Tool not found: %s", toolPart.Tool))
}
fmt.Printf("[tools] Tool found in registry: %s\n", t.ID())

// Check permissions
if err := p.checkToolPermission(ctx, state, agent, toolPart); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions go-opencode/internal/tool/registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tool

import (
"fmt"
"sync"

einotool "github.com/cloudwego/eino/components/tool"
Expand All @@ -26,6 +27,7 @@ func NewRegistry(workDir string) *Registry {
func (r *Registry) Register(tool Tool) {
r.mu.Lock()
defer r.mu.Unlock()
fmt.Printf("[registry] Registering tool: %s\n", tool.ID())
r.tools[tool.ID()] = tool
}

Expand Down Expand Up @@ -92,6 +94,7 @@ func (r *Registry) ToolInfos() ([]*schema.ToolInfo, error) {

// DefaultRegistry creates a registry with all built-in tools.
func DefaultRegistry(workDir string) *Registry {
fmt.Printf("[registry] Creating DefaultRegistry with workDir=%s\n", workDir)
r := NewRegistry(workDir)

// Register core tools
Expand All @@ -103,5 +106,6 @@ func DefaultRegistry(workDir string) *Registry {
r.Register(NewGrepTool(workDir))
r.Register(NewListTool(workDir))

fmt.Printf("[registry] DefaultRegistry created with %d tools: %v\n", len(r.tools), r.IDs())
return r
}
4 changes: 4 additions & 0 deletions packages/opencode/src/cli/cmd/tui/attach.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { cmd } from "../cmd"
import { tui } from "./app"
import { Log } from "@/util/log"

export const AttachCommand = cmd({
command: "attach <url>",
Expand All @@ -16,6 +17,9 @@ export const AttachCommand = cmd({
description: "directory to run in",
}),
handler: async (args) => {
// Capture console.log/error/warn/debug to log file in TUI mode
Log.captureConsole()

if (args.dir) process.chdir(args.dir)
await tui({
url: args.url,
Expand Down
Loading
Loading