diff --git a/packages/core/src/tracing/openai/index.ts b/packages/core/src/tracing/openai/index.ts index b0d26f92c36c..ba1472c34980 100644 --- a/packages/core/src/tracing/openai/index.ts +++ b/packages/core/src/tracing/openai/index.ts @@ -162,8 +162,103 @@ function addRequestAttributes(span: Span, params: Record, opera } /** - * Instrument a method with Sentry spans - * Following Sentry AI Agents Manual Instrumentation conventions + * Wrap the original return value so span logic runs on settle while preserving + * API surface (e.g. .withResponse()). Callers can await the wrapper or call .withResponse(). + */ +function wrapReturnValue( + result: R & { then?: (onFulfilled?: (value: unknown) => unknown, onRejected?: (reason?: unknown) => unknown) => unknown; withResponse?: () => unknown }, + span: Span, + options: { recordOutputs?: boolean; recordInputs?: boolean }, + methodPath: InstrumentedMethod, + params: Record | undefined, + operationName: string, + isStreamRequested: boolean, +): R { + const thenable = + result !== null && + typeof result === 'object' && + typeof (result as { then?: unknown }).then === 'function' + ? (result as unknown as Promise) + : Promise.resolve(result); + + const chained = isStreamRequested + ? thenable.then( + (stream: unknown) => + instrumentStream( + stream as OpenAIStream, + span, + options.recordOutputs ?? false, + ), + (error: unknown) => { + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + captureException(error, { + mechanism: { handled: false, type: 'auto.ai.openai.stream', data: { function: methodPath } }, + }); + span.end(); + throw error; + }, + ) + : thenable.then( + (data: unknown) => { + addResponseAttributes(span, data, options.recordOutputs); + span.end(); + return data; + }, + (error: unknown) => { + captureException(error, { + mechanism: { handled: false, type: 'auto.ai.openai', data: { function: methodPath } }, + }); + span.end(); + throw error; + }, + ); + + const wrapper = { + then(onFulfilled?: (value: unknown) => unknown, onRejected?: (reason?: unknown) => unknown) { + return chained.then(onFulfilled, onRejected); + }, + catch(onRejected?: (reason?: unknown) => unknown) { + return chained.catch(onRejected); + }, + finally(onFinally?: () => void) { + return chained.finally(onFinally); + }, + } as unknown as R & { withResponse?: () => unknown }; + + if (typeof result === 'object' && result !== null && typeof (result as { withResponse?: () => unknown }).withResponse === 'function') { + const withResponseOriginal = (result as { withResponse: () => unknown }).withResponse; + wrapper.withResponse = function withResponse() { + const withResponseResult = withResponseOriginal.call(result); + const withResponseThenable = + withResponseResult !== null && + typeof withResponseResult === 'object' && + typeof (withResponseResult as { then?: unknown }).then === 'function' + ? (withResponseResult as Promise<{ data: AsyncIterable; response: unknown }>) + : Promise.resolve(withResponseResult); + + if (isStreamRequested) { + return withResponseThenable.then((payload: { data: AsyncIterable; response: unknown }) => ({ + data: instrumentStream( + payload.data as OpenAIStream, + span, + options.recordOutputs ?? false, + ), + response: payload.response, + })); + } + return withResponseThenable.then((payload: { data: unknown; response: unknown }) => { + addResponseAttributes(span, payload.data, options.recordOutputs); + return payload; + }); + }; + } + + return wrapper as R; +} + +/** + * Instrument a method with Sentry spans. Returns the same shape as the original + * (including .withResponse() when present) and runs span logic when the promise settles. * @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation */ function instrumentMethod( @@ -171,86 +266,48 @@ function instrumentMethod( methodPath: InstrumentedMethod, context: unknown, options: OpenAiOptions, -): (...args: T) => Promise { - return async function instrumentedMethod(...args: T): Promise { +): (...args: T) => R { + return function instrumentedMethod(...args: T): R { const requestAttributes = extractRequestAttributes(args, methodPath); const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown'; const operationName = getOperationName(methodPath); - const params = args[0] as Record | undefined; - const isStreamRequested = params && typeof params === 'object' && params.stream === true; - - if (isStreamRequested) { - // For streaming responses, use manual span management to properly handle the async generator lifecycle - return startSpanManual( - { - name: `${operationName} ${model} stream-response`, - op: getSpanOperation(methodPath), - attributes: requestAttributes as Record, - }, - async (span: Span) => { - try { - if (options.recordInputs && params) { - addRequestAttributes(span, params, operationName); - } - - const result = await originalMethod.apply(context, args); - - return instrumentStream( - result as OpenAIStream, - span, - options.recordOutputs ?? false, - ) as unknown as R; - } catch (error) { - // For streaming requests that fail before stream creation, we still want to record - // them as streaming requests but end the span gracefully - span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); - captureException(error, { - mechanism: { - handled: false, - type: 'auto.ai.openai.stream', - data: { - function: methodPath, - }, - }, - }); - span.end(); - throw error; - } - }, - ); - } else { - // Non-streaming responses - return startSpan( - { - name: `${operationName} ${model}`, - op: getSpanOperation(methodPath), - attributes: requestAttributes as Record, - }, - async (span: Span) => { - try { - if (options.recordInputs && params) { - addRequestAttributes(span, params, operationName); - } - - const result = await originalMethod.apply(context, args); - addResponseAttributes(span, result, options.recordOutputs); - return result; - } catch (error) { - captureException(error, { - mechanism: { - handled: false, - type: 'auto.ai.openai', - data: { - function: methodPath, - }, - }, - }); - throw error; - } - }, - ); - } + const isStreamRequested = !!(params && typeof params === 'object' && params.stream === true); + + const spanOptions = { + name: isStreamRequested ? `${operationName} ${model} stream-response` : `${operationName} ${model}`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }; + + return startSpanManual(spanOptions, (span: Span) => { + if (options.recordInputs && params) { + addRequestAttributes(span, params, operationName); + } + let result: R & { + then?: (onFulfilled?: (value: unknown) => unknown, onRejected?: (reason?: unknown) => unknown) => unknown; + withResponse?: () => unknown; + }; + try { + result = originalMethod.apply(context, args) as typeof result; + } catch (error) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + captureException(error, { + mechanism: { handled: false, type: 'auto.ai.openai', data: { function: methodPath } }, + }); + span.end(); + throw error; + } + return wrapReturnValue( + result, + span, + options, + methodPath, + params, + operationName, + isStreamRequested, + ) as R; + }); }; }