From cec8f0c274c90586c4c17d824dee58a6d0f22b5a Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Tue, 17 Feb 2026 16:20:51 +0100 Subject: [PATCH 1/5] feat(appkit): add Genie plugin for AI/BI space integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new Genie plugin that provides an opinionated chat API powered by Databricks AI/BI Genie spaces. Users configure named space aliases in plugin config, and the backend resolves aliases to actual space IDs. Key design: - Single SSE endpoint: POST /api/genie/:alias/messages - Always executes as user (OBO) via asUser(req) - SSE event flow: message_start → status (×N) → message_result → query_result (×N) - Space alias abstraction keeps space IDs out of URLs and client code - No cache/retry (chat is stateful and non-idempotent) - Configurable timeout (default 2min, 0 for indefinite) Also fixes pre-existing ajv type resolution issue in shared package where pnpm hoisting caused TypeScript to resolve ajv@6 types instead of the declared ajv@8 dependency. Signed-off-by: Jorge Calvar --- packages/appkit/src/index.ts | 2 +- packages/appkit/src/plugins/genie/defaults.ts | 16 + packages/appkit/src/plugins/genie/genie.ts | 341 ++++++++++++ packages/appkit/src/plugins/genie/index.ts | 3 + .../appkit/src/plugins/genie/manifest.json | 43 ++ packages/appkit/src/plugins/genie/manifest.ts | 10 + .../src/plugins/genie/tests/genie.test.ts | 486 ++++++++++++++++++ packages/appkit/src/plugins/genie/types.ts | 54 ++ packages/appkit/src/plugins/index.ts | 1 + .../shared/src/cli/commands/plugins-sync.ts | 18 +- 10 files changed, 965 insertions(+), 9 deletions(-) create mode 100644 packages/appkit/src/plugins/genie/defaults.ts create mode 100644 packages/appkit/src/plugins/genie/genie.ts create mode 100644 packages/appkit/src/plugins/genie/index.ts create mode 100644 packages/appkit/src/plugins/genie/manifest.json create mode 100644 packages/appkit/src/plugins/genie/manifest.ts create mode 100644 packages/appkit/src/plugins/genie/tests/genie.test.ts create mode 100644 packages/appkit/src/plugins/genie/types.ts diff --git a/packages/appkit/src/index.ts b/packages/appkit/src/index.ts index b0745592..58feb851 100644 --- a/packages/appkit/src/index.ts +++ b/packages/appkit/src/index.ts @@ -30,7 +30,7 @@ export { } from "./errors"; // Plugin authoring export { Plugin, toPlugin } from "./plugin"; -export { analytics, server } from "./plugins"; +export { analytics, genie, server } from "./plugins"; // Registry types and utilities for plugin manifests export type { ConfigSchema, diff --git a/packages/appkit/src/plugins/genie/defaults.ts b/packages/appkit/src/plugins/genie/defaults.ts new file mode 100644 index 00000000..0acf415c --- /dev/null +++ b/packages/appkit/src/plugins/genie/defaults.ts @@ -0,0 +1,16 @@ +import type { StreamExecutionSettings } from "shared"; + +export const genieStreamDefaults: StreamExecutionSettings = { + default: { + // Cache disabled: chat messages are conversational and stateful, not repeatable queries. + cache: { + enabled: false, + }, + // Retry disabled: Genie calls are not idempotent (retries could create duplicate + // conversations/messages), and the SDK Waiter already handles transient polling failures. + retry: { + enabled: false, + }, + timeout: 120_000, + }, +}; diff --git a/packages/appkit/src/plugins/genie/genie.ts b/packages/appkit/src/plugins/genie/genie.ts new file mode 100644 index 00000000..23fa79b5 --- /dev/null +++ b/packages/appkit/src/plugins/genie/genie.ts @@ -0,0 +1,341 @@ +import type { + GenieMessage, + GenieStartConversationResponse, +} from "@databricks/sdk-experimental/dist/apis/dashboards"; +import Time, { + TimeUnits, +} from "@databricks/sdk-experimental/dist/retries/Time"; +import type { Waiter } from "@databricks/sdk-experimental/dist/wait"; +import type express from "express"; +import type { IAppRouter, StreamExecutionSettings } from "shared"; +import { getWorkspaceClient } from "../../context"; +import { createLogger } from "../../logging/logger"; +import { Plugin, toPlugin } from "../../plugin"; +import { genieStreamDefaults } from "./defaults"; +import { genieManifest } from "./manifest"; +import type { + GenieAttachmentResponse, + GenieMessageResponse, + GenieSendMessageRequest, + GenieStreamEvent, + IGenieConfig, +} from "./types"; + +const logger = createLogger("genie"); + +type StartConversationWaiter = Waiter< + GenieStartConversationResponse, + GenieMessage +>; +type CreateMessageWaiter = Waiter; + +/** Extract our cleaned attachment response from a raw SDK GenieMessage */ +function mapAttachments(message: GenieMessage): GenieAttachmentResponse[] { + return ( + message.attachments?.map((att) => ({ + attachmentId: att.attachment_id, + query: att.query + ? { + title: att.query.title, + description: att.query.description, + query: att.query.query, + statementId: att.query.statement_id, + } + : undefined, + text: att.text ? { content: att.text.content } : undefined, + suggestedQuestions: att.suggested_questions?.questions, + })) ?? [] + ); +} + +/** Build a GenieMessageResponse from a raw SDK GenieMessage */ +function toMessageResponse(message: GenieMessage): GenieMessageResponse { + return { + messageId: message.message_id, + conversationId: message.conversation_id, + spaceId: message.space_id, + status: message.status ?? "COMPLETED", + content: message.content, + attachments: mapAttachments(message), + error: message.error?.error, + }; +} + +export class GeniePlugin extends Plugin { + name = "genie"; + + static manifest = genieManifest; + + protected static description = + "AI/BI Genie space integration for natural language data queries"; + protected declare config: IGenieConfig; + + constructor(config: IGenieConfig) { + super(config); + this.config = config; + } + + private resolveSpaceId(alias: string): string | null { + return this.config.spaces?.[alias] ?? null; + } + + injectRoutes(router: IAppRouter) { + this.route(router, { + name: "sendMessage", + method: "post", + path: "/:alias/messages", + handler: async (req: express.Request, res: express.Response) => { + await this.asUser(req)._handleSendMessage(req, res); + }, + }); + } + + async _handleSendMessage( + req: express.Request, + res: express.Response, + ): Promise { + const { alias } = req.params; + const spaceId = this.resolveSpaceId(alias); + + if (!spaceId) { + res.status(404).json({ error: `Unknown space alias: ${alias}` }); + return; + } + + const { content, conversationId } = req.body as GenieSendMessageRequest; + + if (!content) { + res.status(400).json({ error: "content is required" }); + return; + } + + logger.debug( + "Sending message to space %s (alias=%s, conversationId=%s)", + spaceId, + alias, + conversationId ?? "new", + ); + + const timeout = this.config.timeout ?? 120_000; + + const streamSettings: StreamExecutionSettings = { + ...genieStreamDefaults, + default: { + ...genieStreamDefaults.default, + // timeout: 0 means indefinite (no TimeoutInterceptor) + timeout, + }, + }; + + await this.executeStream( + res, + async function* () { + const workspaceClient = getWorkspaceClient(); + + try { + // Status events queue bridging onProgress → generator + const statusQueue: string[] = []; + let notifyGenerator: () => void = () => {}; + let waiterDone = false; + + const onProgress = async (message: GenieMessage): Promise => { + if (message.status) { + statusQueue.push(message.status); + notifyGenerator(); + } + }; + + let resultConversationId = ""; + let resultMessageId = ""; + let completedMessage: GenieMessage = + undefined as unknown as GenieMessage; + let waiterError: Error | null = null; + + // Launch Genie API call + const waiterPromise = (async () => { + let messageWaiter: CreateMessageWaiter; + + if (conversationId) { + messageWaiter = await workspaceClient.genie.createMessage({ + space_id: spaceId, + conversation_id: conversationId, + content, + }); + resultConversationId = conversationId; + } else { + const startWaiter: StartConversationWaiter = + await workspaceClient.genie.startConversation({ + space_id: spaceId, + content, + }); + resultConversationId = startWaiter.conversation_id; + resultMessageId = startWaiter.message_id; + messageWaiter = startWaiter as unknown as CreateMessageWaiter; + } + + const result = await messageWaiter.wait({ onProgress }); + completedMessage = result; + resultMessageId = result.message_id; + return result; + })() + .catch((err: Error) => { + waiterError = err; + }) + .finally(() => { + waiterDone = true; + notifyGenerator(); + }); + + // Wait for first status or waiter completion to get IDs + await new Promise((resolve) => { + notifyGenerator = resolve; + if (waiterDone) resolve(); + }); + + // If the API call failed before anything started, yield error and exit + if (waiterError) { + throw waiterError; + } + + // Yield message_start + yield { + type: "message_start" as const, + conversationId: resultConversationId, + messageId: resultMessageId, + spaceId, + }; + + // Drain status events + while (!waiterDone || statusQueue.length > 0) { + while (statusQueue.length > 0) { + const status = statusQueue.shift(); + if (status) { + yield { type: "status" as const, status }; + } + } + + if (!waiterDone) { + await new Promise((resolve) => { + notifyGenerator = resolve; + if (waiterDone) resolve(); + }); + } + } + + // Check if waiter failed during polling + await waiterPromise; + if (waiterError) { + throw waiterError; + } + + // Build cleaned message response + const messageResponse = toMessageResponse(completedMessage); + + yield { + type: "message_result" as const, + message: messageResponse, + }; + + // Fetch query results for each query attachment + const attachments = messageResponse.attachments ?? []; + for (const att of attachments) { + if (att.query?.statementId && att.attachmentId) { + try { + const queryResult = + await workspaceClient.genie.getMessageAttachmentQueryResult({ + space_id: spaceId, + conversation_id: resultConversationId, + message_id: resultMessageId, + attachment_id: att.attachmentId, + }); + + yield { + type: "query_result" as const, + attachmentId: att.attachmentId, + statementId: att.query.statementId, + data: queryResult.statement_response, + }; + } catch (error) { + logger.error( + "Failed to fetch query result for attachment %s: %O", + att.attachmentId, + error, + ); + yield { + type: "error" as const, + error: `Failed to fetch query result for attachment ${att.attachmentId}`, + }; + } + } + } + } catch (error) { + logger.error("Genie message error: %O", error); + yield { + type: "error" as const, + error: + error instanceof Error ? error.message : "Genie request failed", + }; + } + }, + streamSettings, + ); + } + + async sendMessage( + alias: string, + content: string, + conversationId?: string, + ): Promise { + const spaceId = this.resolveSpaceId(alias); + if (!spaceId) { + throw new Error(`Unknown space alias: ${alias}`); + } + + const workspaceClient = getWorkspaceClient(); + const timeout = this.config.timeout ?? 120_000; + + let messageWaiter: CreateMessageWaiter; + let resultConversationId: string; + + if (conversationId) { + messageWaiter = await workspaceClient.genie.createMessage({ + space_id: spaceId, + conversation_id: conversationId, + content, + }); + resultConversationId = conversationId; + } else { + const startWaiter: StartConversationWaiter = + await workspaceClient.genie.startConversation({ + space_id: spaceId, + content, + }); + resultConversationId = startWaiter.conversation_id; + messageWaiter = startWaiter as unknown as CreateMessageWaiter; + } + + const waitOptions = + timeout > 0 ? { timeout: new Time(timeout, TimeUnits.milliseconds) } : {}; + const completedMessage = await messageWaiter.wait(waitOptions); + + return { + ...toMessageResponse(completedMessage), + conversationId: resultConversationId, + }; + } + + async shutdown(): Promise { + this.streamManager.abortAll(); + } + + exports() { + return { + sendMessage: this.sendMessage, + }; + } +} + +export const genie = toPlugin( + GeniePlugin, + "genie", +); diff --git a/packages/appkit/src/plugins/genie/index.ts b/packages/appkit/src/plugins/genie/index.ts new file mode 100644 index 00000000..6726262f --- /dev/null +++ b/packages/appkit/src/plugins/genie/index.ts @@ -0,0 +1,3 @@ +export * from "./genie"; +export * from "./manifest"; +export * from "./types"; diff --git a/packages/appkit/src/plugins/genie/manifest.json b/packages/appkit/src/plugins/genie/manifest.json new file mode 100644 index 00000000..a269795d --- /dev/null +++ b/packages/appkit/src/plugins/genie/manifest.json @@ -0,0 +1,43 @@ +{ + "name": "genie", + "displayName": "Genie Plugin", + "description": "AI/BI Genie space integration for natural language data queries", + "resources": { + "required": [ + { + "type": "genie_space", + "alias": "Genie Space", + "resourceKey": "genie-space", + "description": "Genie Space for AI-powered data queries. Space IDs configured via plugin config.", + "permission": "CAN_RUN", + "fields": { + "id": { + "env": "DATABRICKS_GENIE_SPACE_ID", + "description": "Default Genie Space ID" + } + } + } + ], + "optional": [] + }, + "config": { + "schema": { + "type": "object", + "properties": { + "spaces": { + "type": "object", + "description": "Map of alias names to Genie Space IDs", + "additionalProperties": { + "type": "string" + } + }, + "timeout": { + "type": "number", + "default": 120000, + "description": "Genie polling timeout in ms. Set to 0 for indefinite." + } + }, + "required": ["spaces"] + } + } +} diff --git a/packages/appkit/src/plugins/genie/manifest.ts b/packages/appkit/src/plugins/genie/manifest.ts new file mode 100644 index 00000000..cf3d98fb --- /dev/null +++ b/packages/appkit/src/plugins/genie/manifest.ts @@ -0,0 +1,10 @@ +import { readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import type { PluginManifest } from "../../registry"; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +export const genieManifest: PluginManifest = JSON.parse( + readFileSync(join(__dirname, "manifest.json"), "utf-8"), +) as PluginManifest; diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts new file mode 100644 index 00000000..dc37b55d --- /dev/null +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -0,0 +1,486 @@ +import { + createMockRequest, + createMockResponse, + createMockRouter, + mockServiceContext, + setupDatabricksEnv, +} from "@tools/test-helpers"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { ServiceContext } from "../../../context/service-context"; +import { GeniePlugin, genie } from "../genie"; +import type { IGenieConfig } from "../types"; + +// Mock CacheManager singleton +const { mockCacheInstance } = vi.hoisted(() => { + const instance = { + get: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + getOrExecute: vi + .fn() + .mockImplementation( + async (_key: unknown[], fn: () => Promise) => { + return await fn(); + }, + ), + generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), + }; + + return { mockCacheInstance: instance }; +}); + +vi.mock("../../../cache", () => ({ + CacheManager: { + getInstanceSync: vi.fn(() => mockCacheInstance), + }, +})); + +function createMockGenieService() { + const getMessageAttachmentQueryResult = vi.fn(); + + const createWaiter = ( + conversationId: string, + messageId: string, + attachments: any[] = [], + status = "COMPLETED", + ) => ({ + wait: vi.fn().mockImplementation(async ({ onProgress }: any) => { + if (onProgress) { + await onProgress({ status: "ASKING_AI" }); + await onProgress({ status: "EXECUTING_QUERY" }); + } + return { + message_id: messageId, + conversation_id: conversationId, + space_id: "test-space-id", + content: "Here are your results", + status, + attachments, + error: undefined, + }; + }), + }); + + const startConversation = vi.fn().mockImplementation(async () => ({ + conversation_id: "new-conv-id", + message_id: "new-msg-id", + ...createWaiter("new-conv-id", "new-msg-id", [ + { + attachment_id: "att-1", + query: { + title: "Top Customers", + description: "Query for top customers", + query: "SELECT * FROM customers", + statement_id: "stmt-1", + }, + }, + ]), + })); + + const createMessage = vi.fn().mockImplementation(async () => + createWaiter("existing-conv-id", "followup-msg-id", [ + { + attachment_id: "att-2", + query: { + title: "Follow-up Query", + query: "SELECT * FROM orders", + statement_id: "stmt-2", + }, + }, + ]), + ); + + return { + startConversation, + createMessage, + getMessageAttachmentQueryResult, + createWaiter, + }; +} + +describe("Genie Plugin", () => { + let config: IGenieConfig; + let serviceContextMock: Awaited>; + let mockGenieService: ReturnType; + + beforeEach(async () => { + config = { + spaces: { + myspace: "test-space-id", + salesbot: "sales-space-id", + }, + timeout: 5000, + }; + setupDatabricksEnv(); + ServiceContext.reset(); + + mockGenieService = createMockGenieService(); + + mockGenieService.getMessageAttachmentQueryResult.mockResolvedValue({ + statement_response: { + status: { state: "SUCCEEDED" }, + result: { + data_array: [ + ["Acme Corp", "1000000"], + ["Globex", "500000"], + ], + }, + manifest: { + schema: { + columns: [ + { name: "customer", type_name: "STRING" }, + { name: "revenue", type_name: "DECIMAL" }, + ], + }, + }, + }, + }); + + serviceContextMock = await mockServiceContext({ + userDatabricksClient: { + genie: mockGenieService, + }, + }); + }); + + afterEach(() => { + serviceContextMock?.restore(); + }); + + test("genie factory should have correct name", () => { + const pluginData = genie({ spaces: { test: "id" } }); + expect(pluginData.name).toBe("genie"); + }); + + test("plugin instance should be created with correct name", () => { + const plugin = new GeniePlugin(config); + expect(plugin.name).toBe("genie"); + }); + + describe("injectRoutes", () => { + test("should register single POST route", () => { + const plugin = new GeniePlugin(config); + const { router } = createMockRouter(); + + plugin.injectRoutes(router); + + expect(router.post).toHaveBeenCalledTimes(1); + expect(router.post).toHaveBeenCalledWith( + "/:alias/messages", + expect.any(Function), + ); + }); + }); + + describe("space alias resolution", () => { + test("should return 404 for unknown alias", async () => { + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/:alias/messages"); + const mockReq = createMockRequest({ + params: { alias: "unknown" }, + body: { content: "test question" }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(404); + expect(mockRes.json).toHaveBeenCalledWith({ + error: "Unknown space alias: unknown", + }); + }); + + test("should resolve valid alias", async () => { + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/:alias/messages"); + const mockReq = createMockRequest({ + params: { alias: "myspace" }, + body: { content: "What are my top customers?" }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).not.toHaveBeenCalledWith(404); + expect(mockGenieService.startConversation).toHaveBeenCalledWith( + expect.objectContaining({ + space_id: "test-space-id", + content: "What are my top customers?", + }), + ); + }); + }); + + describe("validation", () => { + test("should return 400 when content is missing", async () => { + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/:alias/messages"); + const mockReq = createMockRequest({ + params: { alias: "myspace" }, + body: {}, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(mockRes.json).toHaveBeenCalledWith({ + error: "content is required", + }); + }); + }); + + describe("send message - new conversation", () => { + test("should call startConversation and stream SSE events", async () => { + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/:alias/messages"); + const mockReq = createMockRequest({ + params: { alias: "myspace" }, + body: { content: "What are my top customers?" }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockGenieService.startConversation).toHaveBeenCalledWith( + expect.objectContaining({ + space_id: "test-space-id", + content: "What are my top customers?", + }), + ); + + // Verify SSE headers + expect(mockRes.setHeader).toHaveBeenCalledWith( + "Content-Type", + "text/event-stream", + ); + expect(mockRes.setHeader).toHaveBeenCalledWith( + "Cache-Control", + "no-cache", + ); + + // Verify SSE events are written + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + // Should have message_start event + expect(allWritten).toContain("message_start"); + expect(allWritten).toContain("new-conv-id"); + + // Should have status events + expect(allWritten).toContain("status"); + expect(allWritten).toContain("ASKING_AI"); + + // Should have message_result event + expect(allWritten).toContain("message_result"); + + // Should have query_result event + expect(allWritten).toContain("query_result"); + + expect(mockRes.end).toHaveBeenCalled(); + }); + }); + + describe("send message - follow-up", () => { + test("should call createMessage with conversationId", async () => { + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/:alias/messages"); + const mockReq = createMockRequest({ + params: { alias: "myspace" }, + body: { + content: "Show me more details", + conversationId: "existing-conv-id", + }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockGenieService.createMessage).toHaveBeenCalledWith( + expect.objectContaining({ + space_id: "test-space-id", + conversation_id: "existing-conv-id", + content: "Show me more details", + }), + ); + + expect(mockGenieService.startConversation).not.toHaveBeenCalled(); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("message_start"); + expect(allWritten).toContain("existing-conv-id"); + expect(mockRes.end).toHaveBeenCalled(); + }); + }); + + describe("multiple attachments", () => { + test("should yield query_result for each query attachment", async () => { + // Override startConversation to return multiple query attachments + mockGenieService.startConversation.mockImplementation(async () => ({ + conversation_id: "multi-conv-id", + message_id: "multi-msg-id", + wait: vi.fn().mockImplementation(async ({ onProgress }: any) => { + if (onProgress) { + await onProgress({ status: "ASKING_AI" }); + } + return { + message_id: "multi-msg-id", + conversation_id: "multi-conv-id", + space_id: "test-space-id", + content: "Here are two queries", + status: "COMPLETED", + attachments: [ + { + attachment_id: "att-q1", + query: { + title: "Query 1", + query: "SELECT 1", + statement_id: "stmt-q1", + }, + }, + { + attachment_id: "att-q2", + query: { + title: "Query 2", + query: "SELECT 2", + statement_id: "stmt-q2", + }, + }, + { + attachment_id: "att-text", + text: { content: "Some explanation" }, + }, + ], + }; + }), + })); + + mockGenieService.getMessageAttachmentQueryResult + .mockResolvedValueOnce({ + statement_response: { result: { data: [["row1"]] } }, + }) + .mockResolvedValueOnce({ + statement_response: { result: { data: [["row2"]] } }, + }); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/:alias/messages"); + const mockReq = createMockRequest({ + params: { alias: "myspace" }, + body: { content: "Run two queries" }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // getMessageAttachmentQueryResult should be called twice (once per query attachment) + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).toHaveBeenCalledTimes(2); + + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).toHaveBeenCalledWith( + expect.objectContaining({ attachment_id: "att-q1" }), + ); + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).toHaveBeenCalledWith( + expect.objectContaining({ attachment_id: "att-q2" }), + ); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + // Should have two query_result events + const queryResultCount = (allWritten.match(/query_result/g) || []).length; + expect(queryResultCount).toBeGreaterThanOrEqual(2); + + expect(mockRes.end).toHaveBeenCalled(); + }); + }); + + describe("error handling", () => { + test("should yield error event on SDK failure", async () => { + mockGenieService.startConversation.mockRejectedValue( + new Error("Genie service unavailable"), + ); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/:alias/messages"); + const mockReq = createMockRequest({ + params: { alias: "myspace" }, + body: { content: "test question" }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("error"); + expect(allWritten).toContain("Genie service unavailable"); + + expect(mockRes.end).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/appkit/src/plugins/genie/types.ts b/packages/appkit/src/plugins/genie/types.ts new file mode 100644 index 00000000..af5b906f --- /dev/null +++ b/packages/appkit/src/plugins/genie/types.ts @@ -0,0 +1,54 @@ +import type { BasePluginConfig } from "shared"; + +export interface IGenieConfig extends BasePluginConfig { + /** Map of alias → Genie Space ID */ + spaces: Record; + /** Genie polling timeout in ms. Set to 0 for indefinite. Default: 120000 (2 min) */ + timeout?: number; +} + +export interface GenieSendMessageRequest { + content: string; + conversationId?: string; +} + +/** SSE event discriminated union */ +export type GenieStreamEvent = + | { + type: "message_start"; + conversationId: string; + messageId: string; + spaceId: string; + } + | { type: "status"; status: string } + | { type: "message_result"; message: GenieMessageResponse } + | { + type: "query_result"; + attachmentId: string; + statementId: string; + data: unknown; + } + | { type: "error"; error: string }; + +/** Cleaned response — subset of SDK's GenieMessage */ +export interface GenieMessageResponse { + messageId: string; + conversationId: string; + spaceId: string; + status: string; + content: string; + attachments?: GenieAttachmentResponse[]; + error?: string; +} + +export interface GenieAttachmentResponse { + attachmentId?: string; + query?: { + title?: string; + description?: string; + query?: string; + statementId?: string; + }; + text?: { content?: string }; + suggestedQuestions?: string[]; +} diff --git a/packages/appkit/src/plugins/index.ts b/packages/appkit/src/plugins/index.ts index aba6f26b..2a5b8dee 100644 --- a/packages/appkit/src/plugins/index.ts +++ b/packages/appkit/src/plugins/index.ts @@ -1,2 +1,3 @@ export * from "./analytics"; +export * from "./genie"; export * from "./server"; diff --git a/packages/shared/src/cli/commands/plugins-sync.ts b/packages/shared/src/cli/commands/plugins-sync.ts index ef4cbc2e..17365b5d 100644 --- a/packages/shared/src/cli/commands/plugins-sync.ts +++ b/packages/shared/src/cli/commands/plugins-sync.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import path from "node:path"; import { fileURLToPath } from "node:url"; import { Lang, parse, type SgNode } from "@ast-grep/napi"; -import Ajv, { type ErrorObject } from "ajv"; +import Ajv from "ajv"; import addFormats from "ajv-formats"; import { Command } from "commander"; @@ -88,19 +88,21 @@ function isWithinDirectory(filePath: string, boundary: string): boolean { ); } -let pluginManifestValidator: ReturnType | null = null; +type ValidateFunction = ReturnType["compile"]>; + +let pluginManifestValidator: ValidateFunction | null = null; /** * Loads and compiles the plugin-manifest JSON schema (cached). * Returns the compiled validate function or null if the schema cannot be loaded. */ -function getPluginManifestValidator(): ReturnType | null { +function getPluginManifestValidator(): ValidateFunction | null { if (pluginManifestValidator) return pluginManifestValidator; try { const schemaRaw = fs.readFileSync(PLUGIN_MANIFEST_SCHEMA_PATH, "utf-8"); const schema = JSON.parse(schemaRaw) as object; - const ajv = new Ajv({ allErrors: true, strict: false }); - addFormats(ajv); + const ajv = new Ajv({ allErrors: true }); + addFormats(ajv as any); pluginManifestValidator = ajv.compile(schema); return pluginManifestValidator; } catch (err) { @@ -153,11 +155,11 @@ function validateManifestWithSchema( const valid = validate(obj); if (valid) return obj as PluginManifest; - const errors: ErrorObject[] = validate.errors ?? []; + const errors = validate.errors ?? []; const message = errors .map( - (e: ErrorObject) => - ` ${e.instancePath || "/"} ${e.message}${e.params ? ` (${JSON.stringify(e.params)})` : ""}`, + (e) => + ` ${(e as any).instancePath || e.dataPath || "/"} ${e.message}${e.params ? ` (${JSON.stringify(e.params)})` : ""}`, ) .join("\n"); console.warn( From 914910f48826600e769d0af43d16ba7e0b045ba9 Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Tue, 17 Feb 2026 16:24:49 +0100 Subject: [PATCH 2/5] fix(shared): handle ajv v6/v8 ErrorObject type differences Cast error entries to `any` when mapping validation errors so the code works regardless of which ajv version TypeScript resolves (v6 has `dataPath`, v8 has `instancePath`). Signed-off-by: Jorge Calvar --- packages/shared/src/cli/commands/plugins-sync.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/shared/src/cli/commands/plugins-sync.ts b/packages/shared/src/cli/commands/plugins-sync.ts index 17365b5d..032e029c 100644 --- a/packages/shared/src/cli/commands/plugins-sync.ts +++ b/packages/shared/src/cli/commands/plugins-sync.ts @@ -158,8 +158,8 @@ function validateManifestWithSchema( const errors = validate.errors ?? []; const message = errors .map( - (e) => - ` ${(e as any).instancePath || e.dataPath || "/"} ${e.message}${e.params ? ` (${JSON.stringify(e.params)})` : ""}`, + (e: any) => + ` ${e.instancePath || e.dataPath || "/"} ${e.message}${e.params ? ` (${JSON.stringify(e.params)})` : ""}`, ) .join("\n"); console.warn( From 62a84e4c1e77c9ef34ae727f8f598cef3e098364 Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Tue, 17 Feb 2026 16:33:48 +0100 Subject: [PATCH 3/5] docs: regenerate API docs for genie plugin export Signed-off-by: Jorge Calvar --- docs/docs/api/appkit/Variable.genie.md | 5 +++++ docs/docs/api/appkit/index.md | 1 + docs/docs/api/appkit/typedoc-sidebar.ts | 5 +++++ 3 files changed, 11 insertions(+) create mode 100644 docs/docs/api/appkit/Variable.genie.md diff --git a/docs/docs/api/appkit/Variable.genie.md b/docs/docs/api/appkit/Variable.genie.md new file mode 100644 index 00000000..cc657d0a --- /dev/null +++ b/docs/docs/api/appkit/Variable.genie.md @@ -0,0 +1,5 @@ +# Variable: genie + +```ts +const genie: ToPlugin; +``` diff --git a/docs/docs/api/appkit/index.md b/docs/docs/api/appkit/index.md index f1a0e5f8..b62f81d5 100644 --- a/docs/docs/api/appkit/index.md +++ b/docs/docs/api/appkit/index.md @@ -52,6 +52,7 @@ plugin architecture, and React integration. | Variable | Description | | ------ | ------ | +| [genie](Variable.genie.md) | - | | [sql](Variable.sql.md) | SQL helper namespace | ## Functions diff --git a/docs/docs/api/appkit/typedoc-sidebar.ts b/docs/docs/api/appkit/typedoc-sidebar.ts index aa114b63..e6ad8bc6 100644 --- a/docs/docs/api/appkit/typedoc-sidebar.ts +++ b/docs/docs/api/appkit/typedoc-sidebar.ts @@ -154,6 +154,11 @@ const typedocSidebar: SidebarsConfig = { type: "category", label: "Variables", items: [ + { + type: "doc", + id: "api/appkit/Variable.genie", + label: "genie" + }, { type: "doc", id: "api/appkit/Variable.sql", From e18c4e5221c26166467d8cda9aa41ef9a75aff24 Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Tue, 17 Feb 2026 16:40:22 +0100 Subject: [PATCH 4/5] fix(appkit): copy genie manifest.json to dist during build Add genie manifest.json to tsdown copy config so it's available at runtime when loading from the built dist/ output. Signed-off-by: Jorge Calvar --- packages/appkit/tsdown.config.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/appkit/tsdown.config.ts b/packages/appkit/tsdown.config.ts index 2472c084..59ee0754 100644 --- a/packages/appkit/tsdown.config.ts +++ b/packages/appkit/tsdown.config.ts @@ -42,6 +42,10 @@ export default defineConfig([ from: "src/plugins/analytics/manifest.json", to: "dist/plugins/analytics/manifest.json", }, + { + from: "src/plugins/genie/manifest.json", + to: "dist/plugins/genie/manifest.json", + }, { from: "src/plugins/server/manifest.json", to: "dist/plugins/server/manifest.json", From b7cf4b8a047a2d769b43542baf8f4d5451bcf312 Mon Sep 17 00:00:00 2001 From: Jorge Calvar Date: Wed, 18 Feb 2026 11:53:31 +0100 Subject: [PATCH 5/5] feat(appkit): add conversation history endpoint to Genie plugin SSE endpoint (GET /api/genie/:alias/conversations/:conversationId) that replays full conversation history reusing existing event types, enabling page refresh without losing chat state. Signed-off-by: Jorge Calvar --- packages/appkit/src/plugins/genie/genie.ts | 175 +++++++ .../src/plugins/genie/tests/genie.test.ts | 438 +++++++++++++++++- packages/appkit/src/plugins/genie/types.ts | 6 + 3 files changed, 618 insertions(+), 1 deletion(-) diff --git a/packages/appkit/src/plugins/genie/genie.ts b/packages/appkit/src/plugins/genie/genie.ts index 23fa79b5..0d958861 100644 --- a/packages/appkit/src/plugins/genie/genie.ts +++ b/packages/appkit/src/plugins/genie/genie.ts @@ -15,6 +15,7 @@ import { genieStreamDefaults } from "./defaults"; import { genieManifest } from "./manifest"; import type { GenieAttachmentResponse, + GenieConversationHistoryResponse, GenieMessageResponse, GenieSendMessageRequest, GenieStreamEvent, @@ -88,6 +89,15 @@ export class GeniePlugin extends Plugin { await this.asUser(req)._handleSendMessage(req, res); }, }); + + this.route(router, { + name: "getConversation", + method: "get", + path: "/:alias/conversations/:conversationId", + handler: async (req: express.Request, res: express.Response) => { + await this.asUser(req)._handleGetConversation(req, res); + }, + }); } async _handleSendMessage( @@ -281,6 +291,170 @@ export class GeniePlugin extends Plugin { ); } + private async _fetchAllMessages( + spaceId: string, + conversationId: string, + ): Promise { + const workspaceClient = getWorkspaceClient(); + const allMessages: GenieMessage[] = []; + let pageToken: string | undefined; + const maxMessages = 200; + + do { + const response = await workspaceClient.genie.listConversationMessages({ + space_id: spaceId, + conversation_id: conversationId, + page_size: 100, + ...(pageToken ? { page_token: pageToken } : {}), + }); + + if (response.messages) { + allMessages.push(...response.messages); + } + + pageToken = response.next_page_token; + } while (pageToken && allMessages.length < maxMessages); + + return allMessages.slice(0, maxMessages); + } + + async _handleGetConversation( + req: express.Request, + res: express.Response, + ): Promise { + const { alias, conversationId } = req.params; + const spaceId = this.resolveSpaceId(alias); + + if (!spaceId) { + res.status(404).json({ error: `Unknown space alias: ${alias}` }); + return; + } + + const includeQueryResults = req.query.includeQueryResults !== "false"; + + logger.debug( + "Fetching conversation %s from space %s (alias=%s, includeQueryResults=%s)", + conversationId, + spaceId, + alias, + includeQueryResults, + ); + + const self = this; + + await this.executeStream( + res, + async function* () { + try { + const messages = await self._fetchAllMessages( + spaceId, + conversationId, + ); + + const messageResponses: GenieMessageResponse[] = []; + + for (const message of messages) { + const messageResponse = toMessageResponse(message); + messageResponses.push(messageResponse); + + yield { + type: "message_result" as const, + message: messageResponse, + }; + } + + if (includeQueryResults) { + // Collect all query attachments across all messages + const queryAttachments: Array<{ + messageId: string; + attachmentId: string; + statementId: string; + }> = []; + + for (const msg of messageResponses) { + for (const att of msg.attachments ?? []) { + if (att.query?.statementId && att.attachmentId) { + queryAttachments.push({ + messageId: msg.messageId, + attachmentId: att.attachmentId, + statementId: att.query.statementId, + }); + } + } + } + + // Fetch all query results in parallel + const workspaceClient = getWorkspaceClient(); + const results = await Promise.allSettled( + queryAttachments.map(async (att) => { + const queryResult = + await workspaceClient.genie.getMessageAttachmentQueryResult({ + space_id: spaceId, + conversation_id: conversationId, + message_id: att.messageId, + attachment_id: att.attachmentId, + }); + return { + attachmentId: att.attachmentId, + statementId: att.statementId, + data: queryResult.statement_response, + }; + }), + ); + + for (const result of results) { + if (result.status === "fulfilled") { + yield { + type: "query_result" as const, + attachmentId: result.value.attachmentId, + statementId: result.value.statementId, + data: result.value.data, + }; + } else { + logger.error("Failed to fetch query result: %O", result.reason); + yield { + type: "error" as const, + error: + result.reason instanceof Error + ? result.reason.message + : "Failed to fetch query result", + }; + } + } + } + } catch (error) { + logger.error("Genie getConversation error: %O", error); + yield { + type: "error" as const, + error: + error instanceof Error + ? error.message + : "Failed to fetch conversation", + }; + } + }, + genieStreamDefaults, + ); + } + + async getConversation( + alias: string, + conversationId: string, + ): Promise { + const spaceId = this.resolveSpaceId(alias); + if (!spaceId) { + throw new Error(`Unknown space alias: ${alias}`); + } + + const messages = await this._fetchAllMessages(spaceId, conversationId); + + return { + conversationId, + spaceId, + messages: messages.map(toMessageResponse), + }; + } + async sendMessage( alias: string, content: string, @@ -331,6 +505,7 @@ export class GeniePlugin extends Plugin { exports() { return { sendMessage: this.sendMessage, + getConversation: this.getConversation, }; } } diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index dc37b55d..5018c0fb 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -90,10 +90,13 @@ function createMockGenieService() { ]), ); + const listConversationMessages = vi.fn(); + return { startConversation, createMessage, getMessageAttachmentQueryResult, + listConversationMessages, createWaiter, }; } @@ -158,7 +161,7 @@ describe("Genie Plugin", () => { }); describe("injectRoutes", () => { - test("should register single POST route", () => { + test("should register POST and GET routes", () => { const plugin = new GeniePlugin(config); const { router } = createMockRouter(); @@ -169,6 +172,12 @@ describe("Genie Plugin", () => { "/:alias/messages", expect.any(Function), ); + + expect(router.get).toHaveBeenCalledTimes(1); + expect(router.get).toHaveBeenCalledWith( + "/:alias/conversations/:conversationId", + expect.any(Function), + ); }); }); @@ -483,4 +492,431 @@ describe("Genie Plugin", () => { expect(mockRes.end).toHaveBeenCalled(); }); }); + + describe("getConversation", () => { + function createConversationRequest(overrides: Record = {}) { + return createMockRequest({ + params: { alias: "myspace", conversationId: "conv-123" }, + query: {}, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + ...overrides, + }); + } + + function mockMessages(messages: any[]) { + mockGenieService.listConversationMessages.mockResolvedValue({ + messages, + next_page_token: undefined, + }); + } + + test("should return 404 for unknown alias", async () => { + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest({ + params: { alias: "unknown", conversationId: "conv-123" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(404); + expect(mockRes.json).toHaveBeenCalledWith({ + error: "Unknown space alias: unknown", + }); + }); + + test("should stream message_result events for each message", async () => { + mockMessages([ + { + message_id: "msg-1", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "What are the top customers?", + status: "COMPLETED", + attachments: [], + }, + { + message_id: "msg-2", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Here are the results", + status: "COMPLETED", + attachments: [ + { + attachment_id: "att-1", + query: { + title: "Top Customers", + query: "SELECT * FROM customers", + statement_id: "stmt-1", + }, + }, + ], + }, + ]); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest(); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockGenieService.listConversationMessages).toHaveBeenCalledWith( + expect.objectContaining({ + space_id: "test-space-id", + conversation_id: "conv-123", + page_size: 100, + }), + ); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + // Should have two message_result events + const messageResultCount = ( + allWritten.match(/"type":"message_result"/g) || [] + ).length; + expect(messageResultCount).toBe(2); + + // Should contain message content + expect(allWritten).toContain("What are the top customers?"); + expect(allWritten).toContain("Here are the results"); + + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should stream query_result events when includeQueryResults is true (default)", async () => { + mockMessages([ + { + message_id: "msg-1", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Results", + status: "COMPLETED", + attachments: [ + { + attachment_id: "att-1", + query: { + title: "Query 1", + query: "SELECT 1", + statement_id: "stmt-1", + }, + }, + ], + }, + ]); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest(); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).toHaveBeenCalledWith( + expect.objectContaining({ + space_id: "test-space-id", + conversation_id: "conv-123", + message_id: "msg-1", + attachment_id: "att-1", + }), + ); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("message_result"); + expect(allWritten).toContain("query_result"); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should NOT stream query_result events when includeQueryResults is false", async () => { + mockMessages([ + { + message_id: "msg-1", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Results", + status: "COMPLETED", + attachments: [ + { + attachment_id: "att-1", + query: { + title: "Query 1", + query: "SELECT 1", + statement_id: "stmt-1", + }, + }, + ], + }, + ]); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest({ + query: { includeQueryResults: "false" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).not.toHaveBeenCalled(); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("message_result"); + expect(allWritten).not.toContain("query_result"); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should paginate through all messages", async () => { + mockGenieService.listConversationMessages + .mockResolvedValueOnce({ + messages: [ + { + message_id: "msg-1", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Page 1 message", + status: "COMPLETED", + attachments: [], + }, + ], + next_page_token: "page-2-token", + }) + .mockResolvedValueOnce({ + messages: [ + { + message_id: "msg-2", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Page 2 message", + status: "COMPLETED", + attachments: [], + }, + ], + next_page_token: undefined, + }); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest({ + query: { includeQueryResults: "false" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockGenieService.listConversationMessages).toHaveBeenCalledTimes( + 2, + ); + + // First call without page_token + expect(mockGenieService.listConversationMessages).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + space_id: "test-space-id", + conversation_id: "conv-123", + page_size: 100, + }), + ); + + // Second call with page_token + expect(mockGenieService.listConversationMessages).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + space_id: "test-space-id", + conversation_id: "conv-123", + page_size: 100, + page_token: "page-2-token", + }), + ); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("Page 1 message"); + expect(allWritten).toContain("Page 2 message"); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should handle empty conversation", async () => { + mockMessages([]); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest(); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).not.toContain("message_result"); + expect(allWritten).not.toContain("query_result"); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should yield error event on SDK failure", async () => { + mockGenieService.listConversationMessages.mockRejectedValue( + new Error("Conversation not found"), + ); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest(); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("error"); + expect(allWritten).toContain("Conversation not found"); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should fetch query results in parallel for multiple attachments across messages", async () => { + mockMessages([ + { + message_id: "msg-1", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "First query", + status: "COMPLETED", + attachments: [ + { + attachment_id: "att-1", + query: { + title: "Query 1", + query: "SELECT 1", + statement_id: "stmt-1", + }, + }, + ], + }, + { + message_id: "msg-2", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Second query", + status: "COMPLETED", + attachments: [ + { + attachment_id: "att-2", + query: { + title: "Query 2", + query: "SELECT 2", + statement_id: "stmt-2", + }, + }, + ], + }, + ]); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createConversationRequest(); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).toHaveBeenCalledTimes(2); + + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).toHaveBeenCalledWith( + expect.objectContaining({ + message_id: "msg-1", + attachment_id: "att-1", + }), + ); + expect( + mockGenieService.getMessageAttachmentQueryResult, + ).toHaveBeenCalledWith( + expect.objectContaining({ + message_id: "msg-2", + attachment_id: "att-2", + }), + ); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + const queryResultCount = ( + allWritten.match(/"type":"query_result"/g) || [] + ).length; + expect(queryResultCount).toBe(2); + expect(mockRes.end).toHaveBeenCalled(); + }); + }); }); diff --git a/packages/appkit/src/plugins/genie/types.ts b/packages/appkit/src/plugins/genie/types.ts index af5b906f..727759bc 100644 --- a/packages/appkit/src/plugins/genie/types.ts +++ b/packages/appkit/src/plugins/genie/types.ts @@ -41,6 +41,12 @@ export interface GenieMessageResponse { error?: string; } +export interface GenieConversationHistoryResponse { + conversationId: string; + spaceId: string; + messages: GenieMessageResponse[]; +} + export interface GenieAttachmentResponse { attachmentId?: string; query?: {