diff --git a/apps/webapp/app/components/code/QueryResultsChart.tsx b/apps/webapp/app/components/code/QueryResultsChart.tsx index 4f7899a740..82cd7234da 100644 --- a/apps/webapp/app/components/code/QueryResultsChart.tsx +++ b/apps/webapp/app/components/code/QueryResultsChart.tsx @@ -1,5 +1,8 @@ -import type { OutputColumnMetadata } from "@internal/clickhouse"; +import type { ColumnFormatType, OutputColumnMetadata } from "@internal/clickhouse"; +import { formatDurationMilliseconds } from "@trigger.dev/core/v3"; import { memo, useMemo } from "react"; +import { createValueFormatter } from "~/utils/columnFormat"; +import { formatCurrencyAccurate } from "~/utils/numberFormatter"; import type { ChartConfig } from "~/components/primitives/charts/Chart"; import { Chart } from "~/components/primitives/charts/ChartCompound"; import { Paragraph } from "../primitives/Paragraph"; @@ -797,8 +800,24 @@ export const QueryResultsChart = memo(function QueryResultsChart({ }; }, [isDateBased, timeGranularity]); - // Create dynamic Y-axis formatter based on data range - const yAxisFormatter = useMemo(() => createYAxisFormatter(data, series), [data, series]); + // Resolve the Y-axis column format for formatting + const yAxisFormat = useMemo(() => { + if (yAxisColumns.length === 0) return undefined; + const col = columns.find((c) => c.name === yAxisColumns[0]); + return (col?.format ?? col?.customRenderType) as ColumnFormatType | undefined; + }, [yAxisColumns, columns]); + + // Create dynamic Y-axis formatter based on data range and format + const yAxisFormatter = useMemo( + () => createYAxisFormatter(data, series, yAxisFormat), + [data, series, yAxisFormat] + ); + + // Create value formatter for tooltips and legend based on column format + const tooltipValueFormatter = useMemo( + () => createValueFormatter(yAxisFormat), + [yAxisFormat] + ); // Check if the group-by column has a runStatus customRenderType const groupByIsRunStatus = useMemo(() => { @@ -1016,6 +1035,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ showLegend={showLegend} maxLegendItems={fullLegend ? Infinity : 5} legendAggregation={config.aggregation} + legendValueFormatter={tooltipValueFormatter} minHeight="300px" fillContainer onViewAllLegendItems={onViewAllLegendItems} @@ -1027,6 +1047,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ yAxisProps={yAxisProps} stackId={stacked ? "stack" : undefined} tooltipLabelFormatter={tooltipLabelFormatter} + tooltipValueFormatter={tooltipValueFormatter} /> ); @@ -1043,6 +1064,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ showLegend={showLegend} maxLegendItems={fullLegend ? Infinity : 5} legendAggregation={config.aggregation} + legendValueFormatter={tooltipValueFormatter} minHeight="300px" fillContainer onViewAllLegendItems={onViewAllLegendItems} @@ -1054,6 +1076,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ yAxisProps={yAxisProps} stacked={stacked && sortedSeries.length > 1} tooltipLabelFormatter={tooltipLabelFormatter} + tooltipValueFormatter={tooltipValueFormatter} lineType="linear" /> @@ -1061,9 +1084,13 @@ export const QueryResultsChart = memo(function QueryResultsChart({ }); /** - * Creates a Y-axis value formatter based on the data range + * Creates a Y-axis value formatter based on the data range and optional format hint */ -function createYAxisFormatter(data: Record[], series: string[]) { +function createYAxisFormatter( + data: Record[], + series: string[], + format?: ColumnFormatType +) { // Find min and max values across all series let minVal = Infinity; let maxVal = -Infinity; @@ -1080,6 +1107,46 @@ function createYAxisFormatter(data: Record[], series: string[]) const range = maxVal - minVal; + // Format-aware formatters + if (format === "bytes" || format === "decimalBytes") { + const divisor = format === "bytes" ? 1024 : 1000; + const units = + format === "bytes" + ? ["B", "KiB", "MiB", "GiB", "TiB"] + : ["B", "KB", "MB", "GB", "TB"]; + return (value: number): string => { + if (value === 0) return "0 B"; + // Use consistent unit for all ticks based on max value + const i = Math.min( + Math.floor(Math.log(Math.abs(maxVal || 1)) / Math.log(divisor)), + units.length - 1 + ); + const scaled = value / Math.pow(divisor, i); + return `${scaled.toFixed(scaled < 10 ? 1 : 0)} ${units[i]}`; + }; + } + + if (format === "percent") { + return (value: number): string => `${value.toFixed(range < 1 ? 2 : 1)}%`; + } + + if (format === "duration") { + return (value: number): string => formatDurationMilliseconds(value, { style: "short" }); + } + + if (format === "durationSeconds") { + return (value: number): string => + formatDurationMilliseconds(value * 1000, { style: "short" }); + } + + if (format === "costInDollars" || format === "cost") { + return (value: number): string => { + const dollars = format === "cost" ? value / 100 : value; + return formatCurrencyAccurate(dollars); + }; + } + + // Default formatter return (value: number): string => { // Use abbreviations for large numbers if (Math.abs(value) >= 1_000_000) { diff --git a/apps/webapp/app/components/code/TSQLResultsTable.tsx b/apps/webapp/app/components/code/TSQLResultsTable.tsx index 951e848235..5c6af16559 100644 --- a/apps/webapp/app/components/code/TSQLResultsTable.tsx +++ b/apps/webapp/app/components/code/TSQLResultsTable.tsx @@ -35,6 +35,7 @@ import { useCopy } from "~/hooks/useCopy"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; import { cn } from "~/utils/cn"; +import { formatBytes, formatDecimalBytes, formatQuantity } from "~/utils/columnFormat"; import { formatCurrencyAccurate, formatNumber } from "~/utils/numberFormatter"; import { v3ProjectPath, v3RunPathFromFriendlyId } from "~/utils/pathBuilder"; import { Paragraph } from "../primitives/Paragraph"; @@ -64,9 +65,10 @@ function getFormattedValue(value: unknown, column: OutputColumnMetadata): string if (value === null) return "NULL"; if (value === undefined) return ""; - // Handle custom render types - if (column.customRenderType) { - switch (column.customRenderType) { + // Handle format hints (from prettyFormat() or auto-populated from customRenderType) + const formatType = column.format ?? column.customRenderType; + if (formatType) { + switch (formatType) { case "duration": if (typeof value === "number") { return formatDurationMilliseconds(value, { style: "short" }); @@ -93,6 +95,26 @@ function getFormattedValue(value: unknown, column: OutputColumnMetadata): string return value; } break; + case "bytes": + if (typeof value === "number") { + return formatBytes(value); + } + break; + case "decimalBytes": + if (typeof value === "number") { + return formatDecimalBytes(value); + } + break; + case "percent": + if (typeof value === "number") { + return `${value.toFixed(2)}%`; + } + break; + case "quantity": + if (typeof value === "number") { + return formatQuantity(value); + } + break; } } @@ -220,6 +242,21 @@ function getDisplayLength(value: unknown, column: OutputColumnMetadata): number if (value === null) return 4; // "NULL" if (value === undefined) return 9; // "UNDEFINED" + // Handle format hint types - estimate their rendered width + const fmt = column.format; + if (fmt === "bytes" || fmt === "decimalBytes") { + // e.g., "1.50 GiB" or "256.00 MB" + return 12; + } + if (fmt === "percent") { + // e.g., "45.23%" + return 8; + } + if (fmt === "quantity") { + // e.g., "1.50M" + return 8; + } + // Handle custom render types - estimate their rendered width if (column.customRenderType) { switch (column.customRenderType) { @@ -392,6 +429,10 @@ function isRightAlignedColumn(column: OutputColumnMetadata): boolean { ) { return true; } + const fmt = column.format; + if (fmt === "bytes" || fmt === "decimalBytes" || fmt === "percent" || fmt === "quantity") { + return true; + } return isNumericType(column.type); } @@ -474,6 +515,32 @@ function CellValue({ return
UNDEFINED
; } + // Check format hint for new format types (from prettyFormat()) + if (column.format && !column.customRenderType) { + switch (column.format) { + case "bytes": + if (typeof value === "number") { + return {formatBytes(value)}; + } + break; + case "decimalBytes": + if (typeof value === "number") { + return {formatDecimalBytes(value)}; + } + break; + case "percent": + if (typeof value === "number") { + return {value.toFixed(2)}%; + } + break; + case "quantity": + if (typeof value === "number") { + return {formatQuantity(value)}; + } + break; + } + } + // First check customRenderType for special rendering if (column.customRenderType) { switch (column.customRenderType) { diff --git a/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx b/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx index f9d4280412..65bda5a9ca 100644 --- a/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx +++ b/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx @@ -1,9 +1,10 @@ -import type { OutputColumnMetadata } from "@internal/tsql"; +import type { ColumnFormatType, OutputColumnMetadata } from "@internal/tsql"; import { useMemo } from "react"; import type { BigNumberAggregationType, BigNumberConfiguration, } from "~/components/metrics/QueryWidget"; +import { createValueFormatter } from "~/utils/columnFormat"; import { AnimatedNumber } from "../AnimatedNumber"; import { Spinner } from "../Spinner"; import { Paragraph } from "../Paragraph"; @@ -129,6 +130,15 @@ export function BigNumberCard({ rows, columns, config, isLoading = false }: BigN return aggregateValues(values, aggregation); }, [rows, column, aggregation, sortDirection]); + // Look up column format for format-aware display + const columnValueFormatter = useMemo(() => { + const columnMeta = columns.find((c) => c.name === column); + const formatType = (columnMeta?.format ?? columnMeta?.customRenderType) as + | ColumnFormatType + | undefined; + return createValueFormatter(formatType); + }, [columns, column]); + if (isLoading) { return (
@@ -147,6 +157,21 @@ export function BigNumberCard({ rows, columns, config, isLoading = false }: BigN ); } + // Use format-aware formatter when available + if (columnValueFormatter) { + return ( +
+
+
+ {prefix && {prefix}} + {columnValueFormatter(result)} + {suffix && {suffix}} +
+
+
+ ); + } + const { displayValue, unitSuffix, decimalPlaces } = abbreviate ? abbreviateValue(result) : { displayValue: result, unitSuffix: undefined, decimalPlaces: getDecimalPlaces(result) }; diff --git a/apps/webapp/app/components/primitives/charts/Chart.tsx b/apps/webapp/app/components/primitives/charts/Chart.tsx index 0d7cd741a6..ffed1a2bc1 100644 --- a/apps/webapp/app/components/primitives/charts/Chart.tsx +++ b/apps/webapp/app/components/primitives/charts/Chart.tsx @@ -104,6 +104,8 @@ const ChartTooltipContent = React.forwardRef< indicator?: "line" | "dot" | "dashed"; nameKey?: string; labelKey?: string; + /** Optional formatter for numeric values (e.g. bytes, duration) */ + valueFormatter?: (value: number) => string; } >( ( @@ -121,6 +123,7 @@ const ChartTooltipContent = React.forwardRef< color, nameKey, labelKey, + valueFormatter, }, ref ) => { @@ -221,9 +224,11 @@ const ChartTooltipContent = React.forwardRef< {itemConfig?.label || item.name}
- {item.value && ( + {item.value != null && ( - {item.value.toLocaleString()} + {valueFormatter && typeof item.value === "number" + ? valueFormatter(item.value) + : item.value.toLocaleString()} )} diff --git a/apps/webapp/app/components/primitives/charts/ChartBar.tsx b/apps/webapp/app/components/primitives/charts/ChartBar.tsx index a34ce66759..814cc5777a 100644 --- a/apps/webapp/app/components/primitives/charts/ChartBar.tsx +++ b/apps/webapp/app/components/primitives/charts/ChartBar.tsx @@ -48,6 +48,8 @@ export type ChartBarRendererProps = { referenceLine?: ReferenceLineProps; /** Custom tooltip label formatter */ tooltipLabelFormatter?: (label: string, payload: any[]) => string; + /** Optional formatter for numeric tooltip values (e.g. bytes, duration) */ + tooltipValueFormatter?: (value: number) => string; /** Width injected by ResponsiveContainer */ width?: number; /** Height injected by ResponsiveContainer */ @@ -72,6 +74,7 @@ export function ChartBarRenderer({ yAxisProps: yAxisPropsProp, referenceLine, tooltipLabelFormatter, + tooltipValueFormatter, width, height, }: ChartBarRendererProps) { @@ -170,7 +173,7 @@ export function ChartBarRenderer({ showLegend ? ( () => null ) : tooltipLabelFormatter ? ( - + ) : ( string; /** Callback when "View all" button is clicked */ onViewAllLegendItems?: () => void; /** When true, constrains legend to max 50% height with scrolling */ @@ -50,6 +52,7 @@ export function ChartLegendCompound({ className, totalLabel, aggregation, + valueFormatter, onViewAllLegendItems, scrollable = false, }: ChartLegendCompoundProps) { @@ -179,7 +182,11 @@ export function ChartLegendCompound({ {currentTotalLabel} {currentTotal != null ? ( - + valueFormatter ? ( + valueFormatter(currentTotal) + ) : ( + + ) ) : ( "\u2013" )} @@ -251,7 +258,11 @@ export function ChartLegendCompound({ )} > {total != null ? ( - + valueFormatter ? ( + valueFormatter(total) + ) : ( + + ) ) : ( "\u2013" )} @@ -268,6 +279,7 @@ export function ChartLegendCompound({ item={legendItems.hoveredHiddenItem} value={currentData[legendItems.hoveredHiddenItem.dataKey] ?? null} remainingCount={legendItems.remaining - 1} + valueFormatter={valueFormatter} /> ) : ( string; }; -function HoveredHiddenItemRow({ item, value, remainingCount }: HoveredHiddenItemRowProps) { +function HoveredHiddenItemRow({ item, value, remainingCount, valueFormatter }: HoveredHiddenItemRowProps) { return (
{/* Active highlight background */} @@ -338,7 +351,15 @@ function HoveredHiddenItemRow({ item, value, remainingCount }: HoveredHiddenItem {remainingCount > 0 && +{remainingCount} more}
- {value != null ? : "\u2013"} + {value != null ? ( + valueFormatter ? ( + valueFormatter(value) + ) : ( + + ) + ) : ( + "\u2013" + )} diff --git a/apps/webapp/app/components/primitives/charts/ChartLine.tsx b/apps/webapp/app/components/primitives/charts/ChartLine.tsx index 7bfd9090ca..62f1bd5abc 100644 --- a/apps/webapp/app/components/primitives/charts/ChartLine.tsx +++ b/apps/webapp/app/components/primitives/charts/ChartLine.tsx @@ -51,6 +51,8 @@ export type ChartLineRendererProps = { stacked?: boolean; /** Custom tooltip label formatter */ tooltipLabelFormatter?: (label: string, payload: any[]) => string; + /** Optional formatter for numeric tooltip values (e.g. bytes, duration) */ + tooltipValueFormatter?: (value: number) => string; /** Width injected by ResponsiveContainer */ width?: number; /** Height injected by ResponsiveContainer */ @@ -75,6 +77,7 @@ export function ChartLineRenderer({ yAxisProps: yAxisPropsProp, stacked = false, tooltipLabelFormatter, + tooltipValueFormatter, width, height, }: ChartLineRendererProps) { @@ -158,7 +161,13 @@ export function ChartLineRenderer({ {/* When legend is shown below, render tooltip with cursor only (no content popup) */} null : } + content={ + showLegend ? ( + () => null + ) : ( + + ) + } labelFormatter={tooltipLabelFormatter} /> {/* Note: Legend is now rendered by ChartRoot outside the chart container */} @@ -207,7 +216,13 @@ export function ChartLineRenderer({ {/* When legend is shown below, render tooltip with cursor only (no content popup) */} null : } + content={ + showLegend ? ( + () => null + ) : ( + + ) + } labelFormatter={tooltipLabelFormatter} /> {/* Note: Legend is now rendered by ChartRoot outside the chart container */} diff --git a/apps/webapp/app/components/primitives/charts/ChartRoot.tsx b/apps/webapp/app/components/primitives/charts/ChartRoot.tsx index 9b5eb3ccb8..dd508e6b16 100644 --- a/apps/webapp/app/components/primitives/charts/ChartRoot.tsx +++ b/apps/webapp/app/components/primitives/charts/ChartRoot.tsx @@ -32,6 +32,8 @@ export type ChartRootProps = { legendTotalLabel?: string; /** Aggregation method used by the legend to compute totals (defaults to sum behavior) */ legendAggregation?: AggregationType; + /** Optional formatter for numeric legend values (e.g. bytes, duration) */ + legendValueFormatter?: (value: number) => string; /** Callback when "View all" legend button is clicked */ onViewAllLegendItems?: () => void; /** When true, constrains legend to max 50% height with scrolling */ @@ -77,6 +79,7 @@ export function ChartRoot({ maxLegendItems = 5, legendTotalLabel, legendAggregation, + legendValueFormatter, onViewAllLegendItems, legendScrollable = false, fillContainer = false, @@ -101,6 +104,7 @@ export function ChartRoot({ maxLegendItems={maxLegendItems} legendTotalLabel={legendTotalLabel} legendAggregation={legendAggregation} + legendValueFormatter={legendValueFormatter} onViewAllLegendItems={onViewAllLegendItems} legendScrollable={legendScrollable} fillContainer={fillContainer} @@ -118,6 +122,7 @@ type ChartRootInnerProps = { maxLegendItems?: number; legendTotalLabel?: string; legendAggregation?: AggregationType; + legendValueFormatter?: (value: number) => string; onViewAllLegendItems?: () => void; legendScrollable?: boolean; fillContainer?: boolean; @@ -131,6 +136,7 @@ function ChartRootInner({ maxLegendItems = 5, legendTotalLabel, legendAggregation, + legendValueFormatter, onViewAllLegendItems, legendScrollable = false, fillContainer = false, @@ -173,6 +179,7 @@ function ChartRootInner({ maxItems={maxLegendItems} totalLabel={legendTotalLabel} aggregation={legendAggregation} + valueFormatter={legendValueFormatter} onViewAllLegendItems={onViewAllLegendItems} scrollable={legendScrollable} /> diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 5a321c58b6..efe68b81a9 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -372,6 +372,7 @@ const EnvironmentSchema = z // Development OTEL environment variables DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(), + DEV_OTEL_METRICS_ENDPOINT: z.string().optional(), // If this is set to 1, then the below variables are used to configure the batch processor for spans and logs DEV_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"), DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"), @@ -382,6 +383,8 @@ const EnvironmentSchema = z DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"), DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"), DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"), + DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(), + DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS: z.string().optional(), PROD_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"), PROD_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"), @@ -392,6 +395,8 @@ const EnvironmentSchema = z PROD_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"), PROD_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"), PROD_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"), + PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(), + PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS: z.string().optional(), TRIGGER_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"), TRIGGER_OTEL_LOG_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"), diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx index 0f3d4042bf..269747ae86 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx @@ -30,6 +30,8 @@ export function AITabContent({ "Top 50 most expensive runs this week", "Average execution duration by task this week", "Run counts by tag in the past 7 days", + "CPU utilization over time by task", + "Peak memory usage per run", ]; return ( diff --git a/apps/webapp/app/routes/otel.v1.metrics.ts b/apps/webapp/app/routes/otel.v1.metrics.ts new file mode 100644 index 0000000000..5529f9310e --- /dev/null +++ b/apps/webapp/app/routes/otel.v1.metrics.ts @@ -0,0 +1,41 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { + ExportMetricsServiceRequest, + ExportMetricsServiceResponse, +} from "@trigger.dev/otlp-importer"; +import { otlpExporter } from "~/v3/otlpExporter.server"; + +export async function action({ request }: ActionFunctionArgs) { + try { + const contentType = request.headers.get("content-type")?.toLowerCase() ?? ""; + + if (contentType.startsWith("application/json")) { + const body = await request.json(); + + const exportResponse = await otlpExporter.exportMetrics( + body as ExportMetricsServiceRequest + ); + + return json(exportResponse, { status: 200 }); + } else if (contentType.startsWith("application/x-protobuf")) { + const buffer = await request.arrayBuffer(); + + const exportRequest = ExportMetricsServiceRequest.decode(new Uint8Array(buffer)); + + const exportResponse = await otlpExporter.exportMetrics(exportRequest); + + return new Response(ExportMetricsServiceResponse.encode(exportResponse).finish(), { + status: 200, + }); + } else { + return new Response( + "Unsupported content type. Must be either application/x-protobuf or application/json", + { status: 400 } + ); + } + } catch (error) { + console.error(error); + + return new Response("Internal Server Error", { status: 500 }); + } +} diff --git a/apps/webapp/app/services/queryService.server.ts b/apps/webapp/app/services/queryService.server.ts index 676be9f2f0..6cd2af03b1 100644 --- a/apps/webapp/app/services/queryService.server.ts +++ b/apps/webapp/app/services/queryService.server.ts @@ -152,7 +152,14 @@ export async function executeQuery( return { success: false, error: new QueryError(errorMessage, { query: options.query }) }; } - // Build time filter fallback for triggered_at column + // Detect which table the query targets to determine the time column + // Each table schema declares its primary time column via timeConstraint + const matchedSchema = querySchemas.find((s) => + new RegExp(`\\bFROM\\s+${s.name}\\b`, "i").test(options.query) + ); + const timeColumn = matchedSchema?.timeConstraint ?? "triggered_at"; + + // Build time filter fallback for the table's time column const defaultPeriod = await getDefaultPeriod(organizationId); const timeFilter = timeFilters({ period: period ?? undefined, @@ -173,15 +180,15 @@ export async function executeQuery( } // Build the fallback WHERE condition based on what the user specified - let triggeredAtFallback: WhereClauseCondition; + let timeFallback: WhereClauseCondition; if (timeFilter.from && timeFilter.to) { - triggeredAtFallback = { op: "between", low: timeFilter.from, high: timeFilter.to }; + timeFallback = { op: "between", low: timeFilter.from, high: timeFilter.to }; } else if (timeFilter.from) { - triggeredAtFallback = { op: "gte", value: timeFilter.from }; + timeFallback = { op: "gte", value: timeFilter.from }; } else if (timeFilter.to) { - triggeredAtFallback = { op: "lte", value: timeFilter.to }; + timeFallback = { op: "lte", value: timeFilter.to }; } else { - triggeredAtFallback = { op: "gte", value: requestedFromDate! }; + timeFallback = { op: "gte", value: requestedFromDate! }; } const maxQueryPeriod = await getLimit(organizationId, "queryPeriodDays", 30); @@ -196,7 +203,7 @@ export async function executeQuery( project_id: scope === "project" || scope === "environment" ? { op: "eq", value: projectId } : undefined, environment_id: scope === "environment" ? { op: "eq", value: environmentId } : undefined, - triggered_at: { op: "gte", value: maxQueryPeriodDate }, + [timeColumn]: { op: "gte", value: maxQueryPeriodDate }, // Optional filters for tasks and queues task_identifier: taskIdentifiers && taskIdentifiers.length > 0 @@ -238,7 +245,7 @@ export async function executeQuery( enforcedWhereClause, fieldMappings, whereClauseFallback: { - triggered_at: triggeredAtFallback, + [timeColumn]: timeFallback, }, timeRange, clickhouseSettings: { diff --git a/apps/webapp/app/utils/columnFormat.ts b/apps/webapp/app/utils/columnFormat.ts new file mode 100644 index 0000000000..ae629c703a --- /dev/null +++ b/apps/webapp/app/utils/columnFormat.ts @@ -0,0 +1,70 @@ +import type { ColumnFormatType } from "@internal/clickhouse"; +import { formatDurationMilliseconds } from "@trigger.dev/core/v3"; +import { formatCurrencyAccurate } from "~/utils/numberFormatter"; + +/** + * Format a number as binary bytes (KiB, MiB, GiB, TiB) + */ +export function formatBytes(bytes: number): string { + if (bytes === 0) return "0 B"; + const units = ["B", "KiB", "MiB", "GiB", "TiB"]; + const i = Math.min( + Math.floor(Math.log(Math.abs(bytes)) / Math.log(1024)), + units.length - 1 + ); + return `${(bytes / Math.pow(1024, i)).toFixed(i === 0 ? 0 : 2)} ${units[i]}`; +} + +/** + * Format a number as decimal bytes (KB, MB, GB, TB) + */ +export function formatDecimalBytes(bytes: number): string { + if (bytes === 0) return "0 B"; + const units = ["B", "KB", "MB", "GB", "TB"]; + const i = Math.min( + Math.floor(Math.log(Math.abs(bytes)) / Math.log(1000)), + units.length - 1 + ); + return `${(bytes / Math.pow(1000, i)).toFixed(i === 0 ? 0 : 2)} ${units[i]}`; +} + +/** + * Format a large number with human-readable suffix (K, M, B) + */ +export function formatQuantity(value: number): string { + const abs = Math.abs(value); + if (abs >= 1_000_000_000) return `${(value / 1_000_000_000).toFixed(2)}B`; + if (abs >= 1_000_000) return `${(value / 1_000_000).toFixed(2)}M`; + if (abs >= 1_000) return `${(value / 1_000).toFixed(2)}K`; + return value.toLocaleString(); +} + +/** + * Creates a value formatter function for a given column format type. + * Used by chart tooltips, legend values, and big number cards. + */ +export function createValueFormatter( + format?: ColumnFormatType +): ((value: number) => string) | undefined { + if (!format) return undefined; + switch (format) { + case "bytes": + return (v) => formatBytes(v); + case "decimalBytes": + return (v) => formatDecimalBytes(v); + case "percent": + return (v) => `${v.toFixed(2)}%`; + case "quantity": + return (v) => formatQuantity(v); + case "duration": + return (v) => formatDurationMilliseconds(v, { style: "short" }); + case "durationSeconds": + return (v) => formatDurationMilliseconds(v * 1000, { style: "short" }); + case "costInDollars": + return (v) => formatCurrencyAccurate(v); + case "cost": + return (v) => formatCurrencyAccurate(v / 100); + default: + return undefined; + } +} diff --git a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts index 39d0c863cb..7f1571fe09 100644 --- a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts +++ b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts @@ -956,6 +956,33 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment }, ]; + if (env.DEV_OTEL_METRICS_ENDPOINT) { + result.push({ + key: "TRIGGER_OTEL_METRICS_ENDPOINT", + value: env.DEV_OTEL_METRICS_ENDPOINT, + }); + } + + if (env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS) { + result.push( + { + key: "TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS", + value: env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + }, + { + key: "TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS", + value: env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + } + ); + } + + if (env.DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS) { + result.push({ + key: "TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS", + value: env.DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS, + }); + } + if (env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1") { result = result.concat([ { @@ -1087,6 +1114,26 @@ async function resolveBuiltInProdVariables( ]); } + if (env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS) { + result.push( + { + key: "TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS", + value: env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + }, + { + key: "TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS", + value: env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + } + ); + } + + if (env.PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS) { + result.push({ + key: "TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS", + value: env.PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS, + }); + } + if (env.PROD_OTEL_BATCH_PROCESSING_ENABLED === "1") { result = result.concat([ { diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index f7337b3b16..cd56eaabce 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -4,10 +4,13 @@ import { AnyValue, ExportLogsServiceRequest, ExportLogsServiceResponse, + ExportMetricsServiceRequest, + ExportMetricsServiceResponse, ExportTraceServiceRequest, ExportTraceServiceResponse, KeyValue, ResourceLogs, + ResourceMetrics, ResourceSpans, SeverityNumber, Span, @@ -15,7 +18,9 @@ import { Span_SpanKind, Status_StatusCode, } from "@trigger.dev/otlp-importer"; +import type { MetricsV1Input } from "@internal/clickhouse"; import { logger } from "~/services/logger.server"; +import { clickhouseClient } from "~/services/clickhouseInstance.server"; import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server"; import { clickhouseEventRepository, @@ -66,6 +71,29 @@ class OTLPExporter { }); } + async exportMetrics( + request: ExportMetricsServiceRequest + ): Promise { + return await startSpan(this._tracer, "exportMetrics", async (span) => { + const rows = this.#filterResourceMetrics(request.resourceMetrics).flatMap( + (resourceMetrics) => { + return convertMetricsToClickhouseRows( + resourceMetrics, + this._spanAttributeValueLengthLimit + ); + } + ); + + span.setAttribute("metric_row_count", rows.length); + + if (rows.length > 0) { + await clickhouseClient.metrics.insert(rows); + } + + return ExportMetricsServiceResponse.create(); + }); + } + async exportLogs(request: ExportLogsServiceRequest): Promise { return await startSpan(this._tracer, "exportLogs", async (span) => { this.#logExportLogsVerbose(request); @@ -202,6 +230,18 @@ class OTLPExporter { return isBoolValue(attribute.value) ? attribute.value.boolValue : false; }); } + + #filterResourceMetrics(resourceMetrics: ResourceMetrics[]): ResourceMetrics[] { + return resourceMetrics.filter((rm) => { + const triggerAttribute = rm.resource?.attributes.find( + (attribute) => attribute.key === SemanticInternalAttributes.TRIGGER + ); + + if (!triggerAttribute) return false; + + return isBoolValue(triggerAttribute.value) ? triggerAttribute.value.boolValue : false; + }); + } } function convertLogsToCreateableEvents( @@ -410,6 +450,208 @@ function convertSpansToCreateableEvents( return { events, taskEventStore }; } +function floorToTenSecondBucket(timeUnixNano: bigint | number): string { + const epochMs = Number(BigInt(timeUnixNano) / BigInt(1_000_000)); + const flooredMs = Math.floor(epochMs / 10_000) * 10_000; + const date = new Date(flooredMs); + // Format as ClickHouse DateTime: YYYY-MM-DD HH:MM:SS + return date.toISOString().replace("T", " ").replace(/\.\d{3}Z$/, ""); +} + +function convertMetricsToClickhouseRows( + resourceMetrics: ResourceMetrics, + spanAttributeValueLengthLimit: number +): MetricsV1Input[] { + const resourceAttributes = resourceMetrics.resource?.attributes ?? []; + const resourceProperties = extractEventProperties(resourceAttributes); + + const organizationId = resourceProperties.organizationId ?? "unknown"; + const projectId = resourceProperties.projectId ?? "unknown"; + const environmentId = resourceProperties.environmentId ?? "unknown"; + const resourceCtx = { + taskSlug: resourceProperties.taskSlug, + runId: resourceProperties.runId, + attemptNumber: resourceProperties.attemptNumber, + machineId: extractStringAttribute(resourceAttributes, SemanticInternalAttributes.MACHINE_ID), + workerId: extractStringAttribute(resourceAttributes, SemanticInternalAttributes.WORKER_ID), + workerVersion: extractStringAttribute( + resourceAttributes, + SemanticInternalAttributes.WORKER_VERSION + ), + }; + + const rows: MetricsV1Input[] = []; + + for (const scopeMetrics of resourceMetrics.scopeMetrics) { + for (const metric of scopeMetrics.metrics) { + const metricName = metric.name; + + // Process gauge data points + if (metric.gauge) { + for (const dp of metric.gauge.dataPoints) { + const value: number = + (dp.asDouble ?? 0) !== 0 ? dp.asDouble! : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; + const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); + + rows.push({ + organization_id: organizationId, + project_id: projectId, + environment_id: environmentId, + metric_name: metricName, + metric_type: "gauge", + metric_subject: resolved.machineId ?? "unknown", + bucket_start: floorToTenSecondBucket(dp.timeUnixNano), + count: 0, + sum_value: 0, + max_value: value, + min_value: value, + last_value: value, + attributes: resolved.attributes, + }); + } + } + + // Process sum data points + if (metric.sum) { + for (const dp of metric.sum.dataPoints) { + const value: number = + (dp.asDouble ?? 0) !== 0 ? dp.asDouble! : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; + const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); + + rows.push({ + organization_id: organizationId, + project_id: projectId, + environment_id: environmentId, + metric_name: metricName, + metric_type: "sum", + metric_subject: resolved.machineId ?? "unknown", + bucket_start: floorToTenSecondBucket(dp.timeUnixNano), + count: 1, + sum_value: value, + max_value: value, + min_value: value, + last_value: value, + attributes: resolved.attributes, + }); + } + } + + // Process histogram data points + if (metric.histogram) { + for (const dp of metric.histogram.dataPoints) { + const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); + const count = Number(dp.count); + const sum = dp.sum ?? 0; + const max = dp.max ?? 0; + const min = dp.min ?? 0; + + rows.push({ + organization_id: organizationId, + project_id: projectId, + environment_id: environmentId, + metric_name: metricName, + metric_type: "histogram", + metric_subject: resolved.machineId ?? "unknown", + bucket_start: floorToTenSecondBucket(dp.timeUnixNano), + count, + sum_value: sum, + max_value: max, + min_value: min, + last_value: count > 0 ? sum / count : 0, + attributes: resolved.attributes, + }); + } + } + } + } + + return rows; +} + +// Prefixes injected by TaskContextMetricExporter — these are extracted into +// the nested `trigger` key and should not appear as top-level user attributes. +const INTERNAL_METRIC_ATTRIBUTE_PREFIXES = ["ctx.", "worker."]; + +interface ResourceContext { + taskSlug: string | undefined; + runId: string | undefined; + attemptNumber: number | undefined; + machineId: string | undefined; + workerId: string | undefined; + workerVersion: string | undefined; +} + +function resolveDataPointContext( + dpAttributes: KeyValue[], + resourceCtx: ResourceContext +): { + machineId: string | undefined; + attributes: Record; +} { + const runId = + resourceCtx.runId ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.RUN_ID); + const taskSlug = + resourceCtx.taskSlug ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.TASK_SLUG); + const attemptNumber = + resourceCtx.attemptNumber ?? + extractNumberAttribute(dpAttributes, SemanticInternalAttributes.ATTEMPT_NUMBER); + const machineId = + resourceCtx.machineId ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.MACHINE_ID); + const workerId = + resourceCtx.workerId ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.WORKER_ID); + const workerVersion = + resourceCtx.workerVersion ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.WORKER_VERSION); + const machineName = extractStringAttribute( + dpAttributes, + SemanticInternalAttributes.MACHINE_PRESET_NAME + ); + const environmentType = extractStringAttribute( + dpAttributes, + SemanticInternalAttributes.ENVIRONMENT_TYPE + ); + + // Build the trigger context object with only defined values + const trigger: Record = {}; + if (runId) trigger.run_id = runId; + if (taskSlug) trigger.task_slug = taskSlug; + if (attemptNumber !== undefined) trigger.attempt_number = attemptNumber; + if (machineId) trigger.machine_id = machineId; + if (machineName) trigger.machine_name = machineName; + if (workerId) trigger.worker_id = workerId; + if (workerVersion) trigger.worker_version = workerVersion; + if (environmentType) trigger.environment_type = environmentType; + + // Build user attributes, filtering out internal ctx/worker keys + const result: Record = {}; + + if (Object.keys(trigger).length > 0) { + result.trigger = trigger; + } + + for (const attr of dpAttributes) { + if (INTERNAL_METRIC_ATTRIBUTE_PREFIXES.some((prefix) => attr.key.startsWith(prefix))) { + continue; + } + + if (isStringValue(attr.value)) { + result[attr.key] = attr.value.stringValue; + } else if (isIntValue(attr.value)) { + result[attr.key] = Number(attr.value.intValue); + } else if (isDoubleValue(attr.value)) { + result[attr.key] = attr.value.doubleValue; + } else if (isBoolValue(attr.value)) { + result[attr.key] = attr.value.boolValue; + } + } + + return { machineId, attributes: result }; +} + function extractEventProperties(attributes: KeyValue[], prefix?: string) { return { metadata: convertSelectedKeyValueItemsToMap(attributes, [SemanticInternalAttributes.METADATA]), diff --git a/apps/webapp/app/v3/querySchemas.ts b/apps/webapp/app/v3/querySchemas.ts index 33a75a4fe1..4a8f2fcf6d 100644 --- a/apps/webapp/app/v3/querySchemas.ts +++ b/apps/webapp/app/v3/querySchemas.ts @@ -1,4 +1,4 @@ -import { column, type TableSchema } from "@internal/tsql"; +import { column, type BucketThreshold, type TableSchema } from "@internal/tsql"; import { z } from "zod"; import { autoFormatSQL } from "~/components/code/TSQLEditor"; import { runFriendlyStatus, runStatusTitleFromStatus } from "~/components/runs/v3/TaskRunStatus"; @@ -434,10 +434,188 @@ export const runsSchema: TableSchema = { }, }; +/** + * Schema definition for the metrics table (trigger_dev.metrics_v1) + */ +export const metricsSchema: TableSchema = { + name: "metrics", + clickhouseName: "trigger_dev.metrics_v1", + description: "Host and runtime metrics collected during task execution", + timeConstraint: "bucket_start", + tenantColumns: { + organizationId: "organization_id", + projectId: "project_id", + environmentId: "environment_id", + }, + columns: { + environment: { + name: "environment", + clickhouseName: "environment_id", + ...column("String", { description: "The environment slug", example: "prod" }), + fieldMapping: "environment", + customRenderType: "environment", + }, + project: { + name: "project", + clickhouseName: "project_id", + ...column("String", { + description: "The project reference, they always start with `proj_`.", + example: "proj_howcnaxbfxdmwmxazktx", + }), + fieldMapping: "project", + customRenderType: "project", + }, + metric_name: { + name: "metric_name", + ...column("LowCardinality(String)", { + description: "The name of the metric (e.g. process.cpu.utilization, system.memory.usage)", + example: "process.cpu.utilization", + coreColumn: true, + }), + }, + metric_type: { + name: "metric_type", + ...column("LowCardinality(String)", { + description: "The type of metric", + allowedValues: ["gauge", "sum", "histogram"], + example: "gauge", + }), + }, + machine_id: { + name: "machine_id", + clickhouseName: "metric_subject", + ...column("String", { + description: "The machine ID that produced this metric", + example: "machine-abc123", + }), + }, + bucket_start: { + name: "bucket_start", + ...column("DateTime", { + description: "The start of the 10-second aggregation bucket", + example: "2024-01-15 09:30:00", + coreColumn: true, + }), + }, + count: { + name: "count", + ...column("UInt64", { + description: "Number of data points in this bucket", + example: "6", + }), + }, + sum_value: { + name: "sum_value", + ...column("Float64", { + description: "Sum of values in this bucket", + example: "0.45", + }), + }, + max_value: { + name: "max_value", + ...column("Float64", { + description: "Maximum value in this bucket", + example: "0.85", + coreColumn: true, + }), + }, + min_value: { + name: "min_value", + ...column("Float64", { + description: "Minimum value in this bucket", + example: "0.12", + }), + }, + last_value: { + name: "last_value", + ...column("Float64", { + description: "Last recorded value in this bucket", + example: "0.42", + coreColumn: true, + }), + }, + + // Trigger context columns (from attributes.trigger.* JSON subpaths) + run_id: { + name: "run_id", + ...column("String", { + description: "The run ID associated with this metric", + customRenderType: "runId", + example: "run_cm1a2b3c4d5e6f7g8h9i", + }), + expression: "attributes.trigger.run_id", + }, + task_identifier: { + name: "task_identifier", + ...column("String", { + description: "Task identifier/slug", + example: "my-background-task", + coreColumn: true, + }), + expression: "attributes.trigger.task_slug", + }, + attempt_number: { + name: "attempt_number", + ...column("String", { + description: "The attempt number for this metric", + example: "1", + }), + expression: "attributes.trigger.attempt_number", + }, + machine_name: { + name: "machine_name", + ...column("String", { + description: "The machine preset used for execution", + allowedValues: [...MACHINE_PRESETS], + example: "small-1x", + }), + expression: "attributes.trigger.machine_name", + }, + environment_type: { + name: "environment_type", + ...column("String", { + description: "Environment type", + allowedValues: [...ENVIRONMENT_TYPES], + customRenderType: "environmentType", + example: "PRODUCTION", + }), + expression: "attributes.trigger.environment_type", + }, + worker_id: { + name: "worker_id", + ...column("String", { + description: "The worker ID that produced this metric", + example: "worker-abc123", + }), + expression: "attributes.trigger.worker_id", + }, + worker_version: { + name: "worker_version", + ...column("String", { + description: "The worker version that produced this metric", + example: "20240115.1", + }), + expression: "attributes.trigger.worker_version", + }, + }, + timeBucketThresholds: [ + // Metrics are pre-aggregated into 10-second buckets, so 10s is the most granular interval. + // All thresholds are shifted coarser compared to the runs table defaults. + { maxRangeSeconds: 3 * 60 * 60, interval: { value: 10, unit: "SECOND" } }, + { maxRangeSeconds: 12 * 60 * 60, interval: { value: 1, unit: "MINUTE" } }, + { maxRangeSeconds: 2 * 24 * 60 * 60, interval: { value: 5, unit: "MINUTE" } }, + { maxRangeSeconds: 7 * 24 * 60 * 60, interval: { value: 15, unit: "MINUTE" } }, + { maxRangeSeconds: 30 * 24 * 60 * 60, interval: { value: 1, unit: "HOUR" } }, + { maxRangeSeconds: 90 * 24 * 60 * 60, interval: { value: 6, unit: "HOUR" } }, + { maxRangeSeconds: 180 * 24 * 60 * 60, interval: { value: 1, unit: "DAY" } }, + { maxRangeSeconds: 365 * 24 * 60 * 60, interval: { value: 1, unit: "WEEK" } }, + ] satisfies BucketThreshold[], +}; + /** * All available schemas for the query editor */ -export const querySchemas: TableSchema[] = [runsSchema]; +export const querySchemas: TableSchema[] = [runsSchema, metricsSchema]; /** * Default query for the query editor diff --git a/apps/webapp/app/v3/services/aiQueryService.server.ts b/apps/webapp/app/v3/services/aiQueryService.server.ts index d397215c8d..2f0174fbe6 100644 --- a/apps/webapp/app/v3/services/aiQueryService.server.ts +++ b/apps/webapp/app/v3/services/aiQueryService.server.ts @@ -55,7 +55,7 @@ export class AIQueryService { constructor( private readonly tableSchema: TableSchema[], - private readonly model: LanguageModelV1 = openai("gpt-4o-mini") + private readonly model: LanguageModelV1 = openai("codex-mini-latest") ) {} /** @@ -65,7 +65,7 @@ export class AIQueryService { private buildSetTimeFilterTool() { return tool({ description: - "Set the time filter for the query page UI instead of adding triggered_at conditions to the query. ALWAYS use this tool when the user wants to filter by time (e.g., 'last 7 days', 'past hour', 'yesterday'). The UI will apply this filter automatically. Do NOT add triggered_at to the WHERE clause - use this tool instead.", + "Set the time filter for the query page UI instead of adding time conditions to the query. ALWAYS use this tool when the user wants to filter by time (e.g., 'last 7 days', 'past hour', 'yesterday'). The UI will apply this filter automatically using the table's time column (triggered_at for runs, bucket_start for metrics). Do NOT add triggered_at or bucket_start to the WHERE clause for time filtering - use this tool instead.", parameters: z.object({ period: z .string() @@ -366,7 +366,7 @@ export class AIQueryService { * Build the system prompt for the AI */ private buildSystemPrompt(schemaDescription: string): string { - return `You are an expert SQL assistant that generates TSQL queries for a task run analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. + return `You are an expert SQL assistant that generates TSQL queries for a task analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. ## Your Task Convert natural language requests into valid TSQL SELECT queries. Always validate your queries using the validateTSQLQuery tool before returning them. @@ -374,6 +374,13 @@ Convert natural language requests into valid TSQL SELECT queries. Always validat ## Available Schema ${schemaDescription} +## Choosing the Right Table + +- **runs** — Task run records (status, timing, cost, output, etc.). Use for questions about runs, tasks, failures, durations, costs, queues. +- **metrics** — Host and runtime metrics collected during task execution (CPU, memory). Use for questions about resource usage, CPU utilization, memory consumption, or performance monitoring. Each row is a 10-second aggregation bucket tied to a specific run. + +When the user mentions "CPU", "memory", "utilization", "resource usage", or similar terms, query the \`metrics\` table. When they mention "runs", "tasks", "failures", "status", "duration", or "cost", query the \`runs\` table. + ## TSQL Syntax Guide TSQL supports standard SQL syntax with some ClickHouse-specific features: @@ -437,16 +444,51 @@ LIMIT 1000 Only use explicit \`toStartOfHour\`/\`toStartOfDay\` etc. if the user specifically requests a particular bucket size (e.g., "group by hour", "bucket by day"). ### Common Patterns + +#### Runs table - Status filter: WHERE status = 'Failed' or WHERE status IN ('Failed', 'Crashed') -- Time filtering: Use the \`setTimeFilter\` tool (NOT triggered_at in WHERE clause) +- Time filtering: Use the \`setTimeFilter\` tool (NOT triggered_at/bucket_start in WHERE clause) + +#### Metrics table +- Filter by metric name: WHERE metric_name = 'process.cpu.utilization' +- Filter by run: WHERE run_id = 'run_abc123' +- Filter by task: WHERE task_identifier = 'my-task' +- Available metric names: process.cpu.utilization, process.cpu.time, process.memory.usage, system.memory.usage, system.memory.utilization, system.network.io, system.network.dropped, system.network.errors +- Use max_value or last_value for gauges (CPU utilization, memory usage), sum_value for counters (CPU time, network IO) +- Use prettyFormat(expr, 'bytes') to tell the UI to format values as bytes (e.g., "1.50 GiB") — keeps values numeric for charts +- Use prettyFormat(expr, 'percent') for percentage values +- prettyFormat does NOT change the SQL — it only adds a display hint +- Available format types: bytes, decimalBytes, percent, quantity, duration, durationSeconds, costInDollars +- For memory metrics, always use prettyFormat with 'bytes' +- For CPU utilization, consider prettyFormat with 'percent' + +\`\`\`sql +-- CPU utilization over time for a task +SELECT timeBucket(), task_identifier, prettyFormat(avg(max_value), 'percent') AS avg_cpu +FROM metrics +WHERE metric_name = 'process.cpu.utilization' +GROUP BY timeBucket, task_identifier +ORDER BY timeBucket +LIMIT 1000 +\`\`\` + +\`\`\`sql +-- Peak memory usage per run +SELECT run_id, task_identifier, prettyFormat(max(max_value), 'bytes') AS peak_memory +FROM metrics +WHERE metric_name = 'process.memory.usage' +GROUP BY run_id, task_identifier +ORDER BY peak_memory DESC +LIMIT 100 +\`\`\` ## Important Rules 1. NEVER use SELECT * - ClickHouse is a columnar database where SELECT * has very poor performance 2. Always select only the specific columns needed for the request 3. When column selection is ambiguous, use the core columns marked [CORE] in the schema -4. **TIME FILTERING**: When the user wants to filter by time (e.g., "last 7 days", "past hour", "yesterday"), ALWAYS use the \`setTimeFilter\` tool instead of adding \`triggered_at\` conditions to the query. The UI has a time filter that will apply this automatically. -5. Do NOT add \`triggered_at\` to WHERE clauses - use \`setTimeFilter\` tool instead. If the user doesn't specify a time period, do NOT add any time filter (the UI defaults to 7 days). +4. **TIME FILTERING**: When the user wants to filter by time (e.g., "last 7 days", "past hour", "yesterday"), ALWAYS use the \`setTimeFilter\` tool instead of adding time conditions to the WHERE clause. The UI has a time filter that will apply this automatically. This applies to both the \`runs\` table (triggered_at) and the \`metrics\` table (bucket_start). +5. Do NOT add \`triggered_at\` or \`bucket_start\` to WHERE clauses for time filtering - use \`setTimeFilter\` tool instead. If the user doesn't specify a time period, do NOT add any time filter (the UI defaults to 7 days). 6. **TIME BUCKETING**: When the user wants to see data over time or in time buckets, use \`timeBucket()\` in SELECT and reference it as \`timeBucket\` in GROUP BY / ORDER BY. Only use manual bucketing functions (toStartOfHour, toStartOfDay, etc.) when the user explicitly requests a specific bucket size. 7. ALWAYS use the validateTSQLQuery tool to check your query before returning it 8. If validation fails, fix the issues and try again (up to 3 attempts) @@ -472,7 +514,7 @@ If you cannot generate a valid query, explain why briefly.`; * Build the system prompt for edit mode */ private buildEditSystemPrompt(schemaDescription: string): string { - return `You are an expert SQL assistant that modifies existing TSQL queries for a task run analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. + return `You are an expert SQL assistant that modifies existing TSQL queries for a task analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. ## Your Task Modify the provided TSQL query according to the user's instructions. Make only the changes requested - preserve the existing query structure where possible. @@ -480,6 +522,11 @@ Modify the provided TSQL query according to the user's instructions. Make only t ## Available Schema ${schemaDescription} +## Choosing the Right Table + +- **runs** — Task run records (status, timing, cost, output, etc.). Use for questions about runs, tasks, failures, durations, costs, queues. +- **metrics** — Host and runtime metrics collected during task execution (CPU, memory). Use for questions about resource usage, CPU utilization, memory consumption, or performance monitoring. Each row is a 10-second aggregation bucket tied to a specific run. + ## TSQL Syntax Guide TSQL supports standard SQL syntax with some ClickHouse-specific features: @@ -539,11 +586,18 @@ ORDER BY timeBucket LIMIT 1000 \`\`\` +### Common Metrics Patterns +- Filter by metric: WHERE metric_name = 'process.cpu.utilization' +- Available metric names: process.cpu.utilization, process.cpu.time, process.memory.usage, system.memory.usage, system.memory.utilization, system.network.io, system.network.dropped, system.network.errors +- Use max_value or last_value for gauges (CPU utilization, memory usage), sum_value for counters (CPU time, network IO) +- Use prettyFormat(expr, 'bytes') for memory metrics, prettyFormat(expr, 'percent') for CPU utilization +- prettyFormat does NOT change the SQL — it only adds a display hint for the UI + ## Important Rules 1. NEVER use SELECT * - ClickHouse is a columnar database where SELECT * has very poor performance 2. If the existing query uses SELECT *, replace it with specific columns (use core columns marked [CORE] as defaults) -3. **TIME FILTERING**: When the user wants to change time filtering (e.g., "change to last 30 days"), use the \`setTimeFilter\` tool instead of modifying \`triggered_at\` conditions. If the existing query has \`triggered_at\` in WHERE, consider removing it and using \`setTimeFilter\` instead. +3. **TIME FILTERING**: When the user wants to change time filtering (e.g., "change to last 30 days"), use the \`setTimeFilter\` tool instead of modifying time column conditions. If the existing query has \`triggered_at\` or \`bucket_start\` in WHERE for time filtering, consider removing it and using \`setTimeFilter\` instead. 4. **TIME BUCKETING**: When adding time-series grouping, use \`timeBucket()\` in SELECT and reference it as \`timeBucket\` in GROUP BY / ORDER BY. Only use manual bucketing functions (toStartOfHour, toStartOfDay, etc.) when the user explicitly requests a specific bucket size. 5. ALWAYS use the validateTSQLQuery tool to check your modified query before returning it 6. If validation fails, fix the issues and try again (up to 3 attempts) diff --git a/internal-packages/clickhouse/schema/016_create_metrics_v1.sql b/internal-packages/clickhouse/schema/016_create_metrics_v1.sql new file mode 100644 index 0000000000..0210f4bf1e --- /dev/null +++ b/internal-packages/clickhouse/schema/016_create_metrics_v1.sql @@ -0,0 +1,40 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS trigger_dev.metrics_v1 +( + organization_id LowCardinality(String), + project_id LowCardinality(String), + environment_id String, + metric_name LowCardinality(String), + metric_type LowCardinality(String), + metric_subject String, + bucket_start DateTime, + count UInt64 DEFAULT 0, + sum_value Float64 DEFAULT 0, + max_value Float64 DEFAULT 0, + min_value Float64 DEFAULT 0, + last_value Float64 DEFAULT 0, + attributes JSON( + `trigger.run_id` String, + `trigger.task_slug` String, + `trigger.attempt_number` Int64, + `trigger.environment_type` LowCardinality(String), + `trigger.machine_id` String, + `trigger.machine_name` LowCardinality(String), + `trigger.worker_id` String, + `trigger.worker_version` String, + `system.cpu.logical_number` String, + `system.cpu.state` LowCardinality(String), + `system.memory.state` LowCardinality(String), + `system.device` String, + `process.cpu.state` LowCardinality(String), + `network.io.direction` LowCardinality(String), + max_dynamic_paths=8 + ) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(bucket_start) +ORDER BY (organization_id, project_id, environment_id, metric_name, metric_subject, bucket_start) +TTL bucket_start + INTERVAL 30 DAY; + +-- +goose Down +DROP TABLE IF EXISTS trigger_dev.metrics_v1; diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 47c2f34f2f..aa41b207ff 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -26,12 +26,14 @@ import { getLogDetailQueryBuilderV2, getLogsSearchListQueryBuilder, } from "./taskEvents.js"; +import { insertMetrics } from "./metrics.js"; import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import type { Agent as HttpAgent } from "http"; import type { Agent as HttpsAgent } from "https"; export type * from "./taskRuns.js"; export type * from "./taskEvents.js"; +export type * from "./metrics.js"; export type * from "./client/queryBuilder.js"; // Re-export column constants, indices, and type-safe accessors @@ -56,7 +58,7 @@ export { type FieldMappings, type WhereClauseCondition, } from "./client/tsql.js"; -export type { OutputColumnMetadata } from "@internal/tsql"; +export type { ColumnFormatType, OutputColumnMetadata } from "@internal/tsql"; // Errors export { QueryError } from "./client/errors.js"; @@ -214,6 +216,12 @@ export class ClickHouse { }; } + get metrics() { + return { + insert: insertMetrics(this.writer), + }; + } + get taskEventsV2() { return { insert: insertTaskEventsV2(this.writer), diff --git a/internal-packages/clickhouse/src/metrics.ts b/internal-packages/clickhouse/src/metrics.ts new file mode 100644 index 0000000000..4469586d8b --- /dev/null +++ b/internal-packages/clickhouse/src/metrics.ts @@ -0,0 +1,33 @@ +import { z } from "zod"; +import { ClickhouseWriter } from "./client/types.js"; + +export const MetricsV1Input = z.object({ + organization_id: z.string(), + project_id: z.string(), + environment_id: z.string(), + metric_name: z.string(), + metric_type: z.string(), + metric_subject: z.string(), + bucket_start: z.string(), + count: z.number(), + sum_value: z.number(), + max_value: z.number(), + min_value: z.number(), + last_value: z.number(), + attributes: z.unknown(), +}); + +export type MetricsV1Input = z.input; + +export function insertMetrics(ch: ClickhouseWriter) { + return ch.insertUnsafe({ + name: "insertMetrics", + table: "trigger_dev.metrics_v1", + settings: { + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + input_format_json_throw_on_bad_escape_sequence: 0, + input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects: 1, + }, + }); +} diff --git a/internal-packages/otlp-importer/src/index.ts b/internal-packages/otlp-importer/src/index.ts index 7401431a84..3a70776ddf 100644 --- a/internal-packages/otlp-importer/src/index.ts +++ b/internal-packages/otlp-importer/src/index.ts @@ -10,6 +10,12 @@ import { ExportLogsServiceResponse, } from "./generated/opentelemetry/proto/collector/logs/v1/logs_service"; +import { + ExportMetricsPartialSuccess, + ExportMetricsServiceRequest, + ExportMetricsServiceResponse, +} from "./generated/opentelemetry/proto/collector/metrics/v1/metrics_service"; + import type { AnyValue, KeyValue, @@ -33,6 +39,21 @@ import { Status, Status_StatusCode, } from "./generated/opentelemetry/proto/trace/v1/trace"; +import { + ResourceMetrics, + ScopeMetrics, + Metric, + Gauge, + Sum, + Histogram, + ExponentialHistogram, + Summary, + NumberDataPoint, + HistogramDataPoint, + ExponentialHistogramDataPoint, + SummaryDataPoint, + AggregationTemporality, +} from "./generated/opentelemetry/proto/metrics/v1/metrics"; export { LogRecord, @@ -57,3 +78,21 @@ export { export { ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse }; export { ExportLogsPartialSuccess, ExportLogsServiceRequest, ExportLogsServiceResponse }; + +export { ExportMetricsPartialSuccess, ExportMetricsServiceRequest, ExportMetricsServiceResponse }; + +export { + ResourceMetrics, + ScopeMetrics, + Metric, + Gauge, + Sum, + Histogram, + ExponentialHistogram, + Summary, + NumberDataPoint, + HistogramDataPoint, + ExponentialHistogramDataPoint, + SummaryDataPoint, + AggregationTemporality, +}; diff --git a/internal-packages/tsql/src/index.ts b/internal-packages/tsql/src/index.ts index f4ac1ea6bb..923d8b8829 100644 --- a/internal-packages/tsql/src/index.ts +++ b/internal-packages/tsql/src/index.ts @@ -109,6 +109,7 @@ export { type ClickHouseType, type ColumnSchema, type FieldMappings, + type ColumnFormatType, type OutputColumnMetadata, type RequiredFilter, type SchemaRegistry, @@ -133,7 +134,9 @@ export { // Re-export time bucket utilities export { + BUCKET_THRESHOLDS, calculateTimeBucketInterval, + type BucketThreshold, type TimeBucketInterval, } from "./query/time_buckets.js"; diff --git a/internal-packages/tsql/src/query/printer.test.ts b/internal-packages/tsql/src/query/printer.test.ts index 585f695fa1..7349a7bd27 100644 --- a/internal-packages/tsql/src/query/printer.test.ts +++ b/internal-packages/tsql/src/query/printer.test.ts @@ -2,7 +2,13 @@ import { describe, it, expect, beforeEach } from "vitest"; import { parseTSQLSelect, parseTSQLExpr, compileTSQL } from "../index.js"; import { ClickHousePrinter, printToClickHouse, type PrintResult } from "./printer.js"; import { createPrinterContext, PrinterContext } from "./printer_context.js"; -import { createSchemaRegistry, column, type TableSchema, type SchemaRegistry } from "./schema.js"; +import { + createSchemaRegistry, + column, + type TableSchema, + type SchemaRegistry, +} from "./schema.js"; +import type { BucketThreshold } from "./time_buckets.js"; import { QueryError, SyntaxError } from "./errors.js"; /** @@ -2335,16 +2341,19 @@ describe("Basic column metadata", () => { name: "status", type: "LowCardinality(String)", customRenderType: "runStatus", + format: "runStatus", }); expect(columns[1]).toEqual({ name: "usage_duration_ms", type: "UInt32", customRenderType: "duration", + format: "duration", }); expect(columns[2]).toEqual({ name: "cost_in_cents", type: "Float64", customRenderType: "cost", + format: "cost", }); }); @@ -2687,6 +2696,133 @@ describe("Basic column metadata", () => { expect(columns[2].name).toBe("avg"); }); }); + + describe("prettyFormat()", () => { + it("should strip prettyFormat from SQL and attach format to column metadata", () => { + const ctx = createMetadataTestContext(); + const { sql, columns } = printQuery( + "SELECT prettyFormat(usage_duration_ms, 'bytes') AS memory FROM runs", + ctx + ); + + // SQL should not contain prettyFormat + expect(sql).not.toContain("prettyFormat"); + expect(sql).toContain("usage_duration_ms"); + + expect(columns).toHaveLength(1); + expect(columns[0].name).toBe("memory"); + expect(columns[0].format).toBe("bytes"); + }); + + it("should work with aggregation wrapping", () => { + const ctx = createMetadataTestContext(); + const { sql, columns } = printQuery( + "SELECT prettyFormat(avg(usage_duration_ms), 'bytes') AS avg_memory FROM runs", + ctx + ); + + expect(sql).not.toContain("prettyFormat"); + expect(sql).toContain("avg(usage_duration_ms)"); + + expect(columns).toHaveLength(1); + expect(columns[0].name).toBe("avg_memory"); + expect(columns[0].format).toBe("bytes"); + expect(columns[0].type).toBe("Float64"); + }); + + it("should work without explicit alias", () => { + const ctx = createMetadataTestContext(); + const { sql, columns } = printQuery( + "SELECT prettyFormat(usage_duration_ms, 'percent') FROM runs", + ctx + ); + + expect(sql).not.toContain("prettyFormat"); + expect(columns).toHaveLength(1); + expect(columns[0].name).toBe("usage_duration_ms"); + expect(columns[0].format).toBe("percent"); + }); + + it("should throw for invalid format type", () => { + const ctx = createMetadataTestContext(); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 'invalid') FROM runs", + ctx + ); + }).toThrow(QueryError); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 'invalid') FROM runs", + ctx + ); + }).toThrow(/Unknown format type/); + }); + + it("should throw for wrong argument count", () => { + const ctx = createMetadataTestContext(); + expect(() => { + printQuery("SELECT prettyFormat(usage_duration_ms) FROM runs", ctx); + }).toThrow(QueryError); + expect(() => { + printQuery("SELECT prettyFormat(usage_duration_ms) FROM runs", ctx); + }).toThrow(/requires exactly 2 arguments/); + }); + + it("should throw when second argument is not a string literal", () => { + const ctx = createMetadataTestContext(); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 123) FROM runs", + ctx + ); + }).toThrow(QueryError); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 123) FROM runs", + ctx + ); + }).toThrow(/must be a string literal/); + }); + + it("should override schema-level customRenderType", () => { + const ctx = createMetadataTestContext(); + const { columns } = printQuery( + "SELECT prettyFormat(usage_duration_ms, 'bytes') AS mem FROM runs", + ctx + ); + + expect(columns).toHaveLength(1); + // prettyFormat's format should take precedence + expect(columns[0].format).toBe("bytes"); + // customRenderType from schema should NOT be set since prettyFormat overrides + // The source column had customRenderType: "duration" but prettyFormat replaces it + }); + + it("should auto-populate format from customRenderType when not explicitly set", () => { + const ctx = createMetadataTestContext(); + const { columns } = printQuery( + "SELECT usage_duration_ms, cost_in_cents FROM runs", + ctx + ); + + expect(columns).toHaveLength(2); + // customRenderType should auto-populate format + expect(columns[0].customRenderType).toBe("duration"); + expect(columns[0].format).toBe("duration"); + expect(columns[1].customRenderType).toBe("cost"); + expect(columns[1].format).toBe("cost"); + }); + + it("should not set format when column has no customRenderType", () => { + const ctx = createMetadataTestContext(); + const { columns } = printQuery("SELECT run_id FROM runs", ctx); + + expect(columns).toHaveLength(1); + expect(columns[0].format).toBeUndefined(); + expect(columns[0].customRenderType).toBeUndefined(); + }); + }); }); describe("Unknown column blocking", () => { @@ -3570,4 +3706,73 @@ describe("timeBucket()", () => { expect(Object.values(params)).toContain("org_test123"); }); }); + + describe("per-table timeBucketThresholds", () => { + const customThresholds: BucketThreshold[] = [ + // 10-second minimum granularity (e.g., for pre-aggregated metrics) + { maxRangeSeconds: 10 * 60, interval: { value: 10, unit: "SECOND" } }, + { maxRangeSeconds: 30 * 60, interval: { value: 30, unit: "SECOND" } }, + { maxRangeSeconds: 2 * 60 * 60, interval: { value: 1, unit: "MINUTE" } }, + ]; + + const schemaWithCustomThresholds: TableSchema = { + ...timeBucketSchema, + name: "metrics", + timeBucketThresholds: customThresholds, + }; + + it("should use custom thresholds when defined on the table schema", () => { + // 3-minute range: global default would give 5 SECOND, custom gives 10 SECOND + const threeMinuteRange = { + from: new Date("2024-01-01T00:00:00Z"), + to: new Date("2024-01-01T00:03:00Z"), + }; + + const schema = createSchemaRegistry([schemaWithCustomThresholds]); + const ctx = createPrinterContext({ + schema, + enforcedWhereClause: { + organization_id: { op: "eq", value: "org_test123" }, + project_id: { op: "eq", value: "proj_test456" }, + environment_id: { op: "eq", value: "env_test789" }, + }, + timeRange: threeMinuteRange, + }); + + const ast = parseTSQLSelect( + "SELECT timeBucket(), count() FROM metrics GROUP BY timeBucket" + ); + const { sql } = printToClickHouse(ast, ctx); + + // Custom thresholds: under 10 min → 10 SECOND (not the global 5 SECOND) + expect(sql).toContain("toStartOfInterval(created_at, INTERVAL 10 SECOND)"); + }); + + it("should fall back to global defaults when no custom thresholds are defined", () => { + // 3-minute range with standard schema (no custom thresholds) + const threeMinuteRange = { + from: new Date("2024-01-01T00:00:00Z"), + to: new Date("2024-01-01T00:03:00Z"), + }; + + const schema = createSchemaRegistry([timeBucketSchema]); + const ctx = createPrinterContext({ + schema, + enforcedWhereClause: { + organization_id: { op: "eq", value: "org_test123" }, + project_id: { op: "eq", value: "proj_test456" }, + environment_id: { op: "eq", value: "env_test789" }, + }, + timeRange: threeMinuteRange, + }); + + const ast = parseTSQLSelect( + "SELECT timeBucket(), count() FROM runs GROUP BY timeBucket" + ); + const { sql } = printToClickHouse(ast, ctx); + + // Global default: under 5 min → 5 SECOND + expect(sql).toContain("toStartOfInterval(created_at, INTERVAL 5 SECOND)"); + }); + }); }); diff --git a/internal-packages/tsql/src/query/printer.ts b/internal-packages/tsql/src/query/printer.ts index ff3c608430..c5af9390c8 100644 --- a/internal-packages/tsql/src/query/printer.ts +++ b/internal-packages/tsql/src/query/printer.ts @@ -59,6 +59,7 @@ import { ClickHouseType, hasFieldMapping, getInternalValueFromMappingCaseInsensitive, + type ColumnFormatType, } from "./schema"; /** @@ -739,6 +740,14 @@ export class ClickHousePrinter { metadata.description = sourceColumn.description; } + // Set format hint from prettyFormat() or auto-populate from customRenderType + const sourceWithFormat = sourceColumn as (Partial & { format?: ColumnFormatType }) | null; + if (sourceWithFormat?.format) { + metadata.format = sourceWithFormat.format; + } else if (sourceColumn?.customRenderType) { + metadata.format = sourceColumn.customRenderType as ColumnFormatType; + } + this.outputColumns.push(metadata); } @@ -932,6 +941,53 @@ export class ClickHousePrinter { }; } + // Handle prettyFormat(expr, 'formatType') — metadata-only wrapper + if ((col as Call).expression_type === "call") { + const call = col as Call; + if (call.name.toLowerCase() === "prettyformat") { + if (call.args.length !== 2) { + throw new QueryError( + "prettyFormat() requires exactly 2 arguments: prettyFormat(expression, 'formatType')" + ); + } + const formatArg = call.args[1]; + if ( + (formatArg as Constant).expression_type !== "constant" || + typeof (formatArg as Constant).value !== "string" + ) { + throw new QueryError( + "prettyFormat() second argument must be a string literal format type" + ); + } + const formatType = (formatArg as Constant).value as string; + const validFormats = [ + "bytes", + "decimalBytes", + "quantity", + "percent", + "duration", + "durationSeconds", + "costInDollars", + "cost", + ]; + if (!validFormats.includes(formatType)) { + throw new QueryError( + `Unknown format type '${formatType}'. Valid types: ${validFormats.join(", ")}` + ); + } + const innerAnalysis = this.analyzeSelectColumn(call.args[0]); + return { + outputName: innerAnalysis.outputName, + sourceColumn: { + ...(innerAnalysis.sourceColumn ?? {}), + type: innerAnalysis.sourceColumn?.type ?? innerAnalysis.inferredType ?? undefined, + format: formatType as ColumnFormatType, + } as Partial & { format?: ColumnFormatType }, + inferredType: innerAnalysis.inferredType, + }; + } + } + // Handle Call (function/aggregation) - infer type from function if ((col as Call).expression_type === "call") { const call = col as Call; @@ -2797,6 +2853,14 @@ export class ClickHousePrinter { private visitCall(node: Call): string { const name = node.name; + // Handle prettyFormat() — strip wrapper, only emit the inner expression + if (name.toLowerCase() === "prettyformat") { + if (node.args.length !== 2) { + throw new QueryError("prettyFormat() requires exactly 2 arguments"); + } + return this.visit(node.args[0]); + } + // Handle timeBucket() - special TSQL function for automatic time bucketing if (name.toLowerCase() === "timebucket") { return this.visitTimeBucket(node); @@ -2973,8 +3037,12 @@ export class ClickHousePrinter { ); } - // Calculate the appropriate interval - const interval = calculateTimeBucketInterval(timeRange.from, timeRange.to); + // Calculate the appropriate interval (use table-specific thresholds if defined) + const interval = calculateTimeBucketInterval( + timeRange.from, + timeRange.to, + tableSchema.timeBucketThresholds + ); // Emit toStartOfInterval(column, INTERVAL N UNIT) return `toStartOfInterval(${escapeClickHouseIdentifier(clickhouseColumnName)}, INTERVAL ${interval.value} ${interval.unit})`; diff --git a/internal-packages/tsql/src/query/schema.ts b/internal-packages/tsql/src/query/schema.ts index fd8a7add16..16a607d8fc 100644 --- a/internal-packages/tsql/src/query/schema.ts +++ b/internal-packages/tsql/src/query/schema.ts @@ -2,6 +2,7 @@ // Defines allowed tables, columns, and tenant isolation configuration import { QueryError } from "./errors"; +import type { BucketThreshold } from "./time_buckets"; /** * ClickHouse data types supported by TSQL @@ -269,6 +270,33 @@ export interface ColumnSchema { */ export type FieldMappings = Record>; +/** + * Display format types for column values. + * + * These tell the UI how to render values without changing the underlying data type. + * Includes both existing custom render types and new format hint types. + */ +export type ColumnFormatType = + // Existing custom render types + | "runId" + | "runStatus" + | "duration" + | "durationSeconds" + | "costInDollars" + | "cost" + | "machine" + | "environment" + | "environmentType" + | "project" + | "queue" + | "tags" + | "number" + // Format hint types (used by prettyFormat()) + | "bytes" + | "decimalBytes" + | "quantity" + | "percent"; + /** * Metadata for a column in query results. * @@ -290,6 +318,16 @@ export interface OutputColumnMetadata { * Only present for columns or virtual columns defined in the table schema. */ description?: string; + /** + * Display format hint — tells the UI how to render numeric values. + * + * Set by `prettyFormat(expr, 'formatType')` in TSQL queries. + * The underlying value remains numeric (for charts), but the UI uses this + * hint for axis labels, table cells, and tooltips. + * + * Also auto-populated from `customRenderType` when not explicitly set. + */ + format?: ColumnFormatType; } /** @@ -354,6 +392,13 @@ export interface TableSchema { * ``` */ timeConstraint?: string; + /** + * Custom time bucket thresholds for this table. + * When set, timeBucket() uses these instead of the global defaults. + * Useful when the table's time granularity differs from the standard (e.g., metrics + * pre-aggregated into 10-second buckets shouldn't go below 10-second intervals). + */ + timeBucketThresholds?: BucketThreshold[]; } /** diff --git a/internal-packages/tsql/src/query/time_buckets.ts b/internal-packages/tsql/src/query/time_buckets.ts index fe7bd7efe6..04edde007d 100644 --- a/internal-packages/tsql/src/query/time_buckets.ts +++ b/internal-packages/tsql/src/query/time_buckets.ts @@ -17,13 +17,23 @@ export interface TimeBucketInterval { } /** - * Time bucket thresholds: each entry defines a maximum time range duration (in seconds) + * A threshold mapping a maximum time range duration to a bucket interval. + */ +export interface BucketThreshold { + /** Maximum range duration in seconds for this threshold to apply */ + maxRangeSeconds: number; + /** The bucket interval to use when the range is under maxRangeSeconds */ + interval: TimeBucketInterval; +} + +/** + * Default time bucket thresholds: each entry defines a maximum time range duration (in seconds) * and the corresponding bucket interval to use. * * The intervals are chosen to produce roughly 50-100 data points for the given range. * Entries are ordered from smallest to largest range. */ -const BUCKET_THRESHOLDS: Array<{ maxRangeSeconds: number; interval: TimeBucketInterval }> = [ +export const BUCKET_THRESHOLDS: BucketThreshold[] = [ // Under 5 minutes → 5 second buckets (max 60 buckets) { maxRangeSeconds: 5 * 60, interval: { value: 5, unit: "SECOND" } }, // Under 30 minutes → 30 second buckets (max 60 buckets) @@ -73,10 +83,14 @@ const DEFAULT_LARGE_INTERVAL: TimeBucketInterval = { value: 1, unit: "MONTH" }; * ); // { value: 6, unit: "HOUR" } * ``` */ -export function calculateTimeBucketInterval(from: Date, to: Date): TimeBucketInterval { +export function calculateTimeBucketInterval( + from: Date, + to: Date, + thresholds?: BucketThreshold[] +): TimeBucketInterval { const rangeSeconds = Math.abs(to.getTime() - from.getTime()) / 1000; - for (const threshold of BUCKET_THRESHOLDS) { + for (const threshold of thresholds ?? BUCKET_THRESHOLDS) { if (rangeSeconds < threshold.maxRangeSeconds) { return threshold.interval; } diff --git a/packages/cli-v3/src/dev/taskRunProcessPool.ts b/packages/cli-v3/src/dev/taskRunProcessPool.ts index 1d0640e52d..810be7acb4 100644 --- a/packages/cli-v3/src/dev/taskRunProcessPool.ts +++ b/packages/cli-v3/src/dev/taskRunProcessPool.ts @@ -2,6 +2,7 @@ import { MachinePresetResources, ServerBackgroundWorker, WorkerManifest, + generateFriendlyId, } from "@trigger.dev/core/v3"; import { TaskRunProcess } from "../executions/taskRunProcess.js"; import { logger } from "../utilities/logger.js"; @@ -23,6 +24,8 @@ export class TaskRunProcessPool { private readonly maxExecutionsPerProcess: number; private readonly executionCountsPerProcess: Map = new Map(); private readonly deprecatedVersions: Set = new Set(); + private readonly idleTimers: Map = new Map(); + private static readonly IDLE_TIMEOUT_MS = 30_000; constructor(options: TaskRunProcessPoolOptions) { this.options = options; @@ -38,6 +41,7 @@ export class TaskRunProcessPool { const versionProcesses = this.availableProcessesByVersion.get(version) || []; const processesToKill = versionProcesses.filter((process) => !process.isExecuting()); + processesToKill.forEach((process) => this.clearIdleTimer(process)); Promise.all(processesToKill.map((process) => this.killProcess(process))).then(() => { this.availableProcessesByVersion.delete(version); }); @@ -71,6 +75,7 @@ export class TaskRunProcessPool { version, availableProcesses.filter((p) => p !== reusableProcess) ); + this.clearIdleTimer(reusableProcess); if (!this.busyProcessesByVersion.has(version)) { this.busyProcessesByVersion.set(version, new Set()); @@ -106,6 +111,7 @@ export class TaskRunProcessPool { env: { ...this.options.env, ...env, + TRIGGER_MACHINE_ID: generateFriendlyId("machine"), }, serverWorker, machineResources, @@ -154,6 +160,7 @@ export class TaskRunProcessPool { this.availableProcessesByVersion.set(version, []); } this.availableProcessesByVersion.get(version)!.push(process); + this.startIdleTimer(process, version); } catch (error) { logger.debug("[TaskRunProcessPool] Failed to cleanup process for reuse, killing it", { error, @@ -213,7 +220,42 @@ export class TaskRunProcessPool { return process.isHealthy; } + private startIdleTimer(process: TaskRunProcess, version: string): void { + this.clearIdleTimer(process); + + const timer = setTimeout(() => { + // Synchronously remove from available pool before async kill to prevent race with getProcess() + const available = this.availableProcessesByVersion.get(version); + if (available) { + const index = available.indexOf(process); + if (index !== -1) { + available.splice(index, 1); + } + } + this.idleTimers.delete(process); + + logger.debug("[TaskRunProcessPool] Idle timeout reached, killing process", { + pid: process.pid, + version, + }); + + this.killProcess(process); + }, TaskRunProcessPool.IDLE_TIMEOUT_MS); + + this.idleTimers.set(process, timer); + } + + private clearIdleTimer(process: TaskRunProcess): void { + const timer = this.idleTimers.get(process); + if (timer) { + clearTimeout(timer); + this.idleTimers.delete(process); + } + } + private async killProcess(process: TaskRunProcess): Promise { + this.clearIdleTimer(process); + if (!process.isHealthy) { logger.debug("[TaskRunProcessPool] Process is not healthy, skipping cleanup", { processId: process.pid, @@ -245,6 +287,12 @@ export class TaskRunProcessPool { versions: Array.from(this.availableProcessesByVersion.keys()), }); + // Clear all idle timers + for (const timer of this.idleTimers.values()) { + clearTimeout(timer); + } + this.idleTimers.clear(); + // Kill all available processes across all versions const allAvailableProcesses = Array.from(this.availableProcessesByVersion.values()).flat(); await Promise.all(allAvailableProcesses.map((process) => this.killProcess(process))); diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 7cd88ab5a9..c1b453eed9 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -204,12 +204,18 @@ async function doBootstrap() { const tracingSDK = new TracingSDK({ url: env.TRIGGER_OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", + metricsUrl: env.TRIGGER_OTEL_METRICS_ENDPOINT, instrumentations: config.telemetry?.instrumentations ?? config.instrumentations ?? [], exporters: config.telemetry?.exporters ?? [], logExporters: config.telemetry?.logExporters ?? [], + metricReaders: config.telemetry?.metricReaders ?? [], diagLogLevel: (env.TRIGGER_OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", forceFlushTimeoutMillis: 30_000, resource: config.telemetry?.resource, + hostMetrics: true, + nodejsRuntimeMetrics: true, + // Drop all system metrics from dev metrics export + droppedMetrics: ["system.*"], }); const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); @@ -619,8 +625,11 @@ const zodIpc = new ZodIpcConnection({ } await flushAll(timeoutInMs); }, - FLUSH: async ({ timeoutInMs }) => { + FLUSH: async ({ timeoutInMs, disableContext }) => { await flushAll(timeoutInMs); + if (disableContext) { + taskContext.disable(); + } }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index f1512f27f0..22eb34615e 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -183,12 +183,16 @@ async function doBootstrap() { const tracingSDK = new TracingSDK({ url: env.TRIGGER_OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", + metricsUrl: env.TRIGGER_OTEL_METRICS_ENDPOINT, instrumentations: config.instrumentations ?? [], diagLogLevel: (env.TRIGGER_OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", forceFlushTimeoutMillis: 30_000, exporters: config.telemetry?.exporters ?? [], logExporters: config.telemetry?.logExporters ?? [], + metricReaders: config.telemetry?.metricReaders ?? [], resource: config.telemetry?.resource, + hostMetrics: true, + nodejsRuntimeMetrics: true, }); const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); @@ -607,8 +611,11 @@ const zodIpc = new ZodIpcConnection({ } await flushAll(timeoutInMs); }, - FLUSH: async ({ timeoutInMs }) => { + FLUSH: async ({ timeoutInMs, disableContext }) => { await flushAll(timeoutInMs); + if (disableContext) { + taskContext.disable(); + } }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); diff --git a/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts b/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts index 381dca908a..101d1827ce 100644 --- a/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts +++ b/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts @@ -1,4 +1,4 @@ -import { WorkerManifest } from "@trigger.dev/core/v3"; +import { WorkerManifest, generateFriendlyId } from "@trigger.dev/core/v3"; import { TaskRunProcess } from "../../executions/taskRunProcess.js"; import { RunnerEnv } from "./env.js"; import { RunLogger, SendDebugLogOptions } from "./logger.js"; @@ -22,6 +22,7 @@ export class TaskRunProcessProvider { private readonly logger: RunLogger; private readonly processKeepAliveEnabled: boolean; private readonly processKeepAliveMaxExecutionCount: number; + private readonly machineId = generateFriendlyId("machine"); // Process keep-alive state private persistentProcess: TaskRunProcess | null = null; @@ -269,6 +270,7 @@ export class TaskRunProcessProvider { return { ...taskRunEnv, ...this.env.gatherProcessEnv(), + TRIGGER_MACHINE_ID: this.machineId, HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000), }; } diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index 1e274ba02f..a329956c0d 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -126,7 +126,7 @@ export class TaskRunProcess { return; } - await tryCatch(this.#flush()); + await tryCatch(this.#flush({ disableContext: !kill })); if (kill) { await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs); @@ -240,10 +240,10 @@ export class TaskRunProcess { return this; } - async #flush(timeoutInMs: number = 5_000) { + async #flush({ timeoutInMs = 5_000, disableContext = false } = {}) { logger.debug("flushing task run process", { pid: this.pid }); - await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000); + await this._ipc?.sendWithAck("FLUSH", { timeoutInMs, disableContext }, timeoutInMs + 1_000); } async #cancel(timeoutInMs: number = 30_000) { diff --git a/packages/core/package.json b/packages/core/package.json index d73b425f7d..dbc564d22f 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -176,10 +176,13 @@ "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", + "@opentelemetry/exporter-metrics-otlp-http": "0.203.0", + "@opentelemetry/host-metrics": "^0.36.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", + "@opentelemetry/sdk-metrics": "2.0.1", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", diff --git a/packages/core/src/v3/config.ts b/packages/core/src/v3/config.ts index 9c6871a264..0b4d23af00 100644 --- a/packages/core/src/v3/config.ts +++ b/packages/core/src/v3/config.ts @@ -1,5 +1,6 @@ import type { Instrumentation } from "@opentelemetry/instrumentation"; import type { SpanExporter } from "@opentelemetry/sdk-trace-base"; +import type { MetricReader } from "@opentelemetry/sdk-metrics"; import type { BuildExtension } from "./build/extensions.js"; import type { AnyOnFailureHookFunction, @@ -109,6 +110,12 @@ export type TriggerConfig = { */ logExporters?: Array; + /** + * Metric readers for OpenTelemetry. Add custom metric readers to export + * metrics to external services alongside the default Trigger.dev exporter. + */ + metricReaders?: Array; + /** * Resource to use for OpenTelemetry. This is useful if you want to add custom resources to your tasks. * diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index f4c114c5f9..b714d8cb93 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -49,6 +49,7 @@ export { NULL_SENTINEL, } from "./utils/flattenAttributes.js"; export { omit } from "./utils/omit.js"; +export { generateFriendlyId, fromFriendlyId } from "./isomorphic/friendlyId.js"; export { calculateNextRetryDelay, calculateResetAt, diff --git a/packages/core/src/v3/otel/machineId.ts b/packages/core/src/v3/otel/machineId.ts new file mode 100644 index 0000000000..79f7bc4ba2 --- /dev/null +++ b/packages/core/src/v3/otel/machineId.ts @@ -0,0 +1,4 @@ +import { generateFriendlyId } from "../isomorphic/friendlyId.js"; +import { getEnvVar } from "../utils/getEnv.js"; + +export const machineId = getEnvVar("TRIGGER_MACHINE_ID") ?? generateFriendlyId("machine"); diff --git a/packages/core/src/v3/otel/nodejsRuntimeMetrics.ts b/packages/core/src/v3/otel/nodejsRuntimeMetrics.ts new file mode 100644 index 0000000000..d12eb94301 --- /dev/null +++ b/packages/core/src/v3/otel/nodejsRuntimeMetrics.ts @@ -0,0 +1,64 @@ +import { type MeterProvider } from "@opentelemetry/sdk-metrics"; +import { performance, monitorEventLoopDelay } from "node:perf_hooks"; + +export function startNodejsRuntimeMetrics(meterProvider: MeterProvider) { + const meter = meterProvider.getMeter("nodejs-runtime", "1.0.0"); + + // Event loop utilization (diff between collection intervals) + let lastElu = performance.eventLoopUtilization(); + + const eluGauge = meter.createObservableGauge("nodejs.event_loop.utilization", { + description: "Event loop utilization over the last collection interval", + unit: "1", + }); + + // Event loop delay histogram (from perf_hooks) + const eld = monitorEventLoopDelay({ resolution: 20 }); + eld.enable(); + + const eldP50 = meter.createObservableGauge("nodejs.event_loop.delay.p50", { + description: "Median event loop delay", + unit: "s", + }); + const eldP99 = meter.createObservableGauge("nodejs.event_loop.delay.p99", { + description: "p99 event loop delay", + unit: "s", + }); + const eldMax = meter.createObservableGauge("nodejs.event_loop.delay.max", { + description: "Max event loop delay", + unit: "s", + }); + + // Heap metrics + const heapUsed = meter.createObservableGauge("nodejs.heap.used", { + description: "V8 heap used", + unit: "By", + }); + const heapTotal = meter.createObservableGauge("nodejs.heap.total", { + description: "V8 heap total allocated", + unit: "By", + }); + + // Single batch callback for all metrics + meter.addBatchObservableCallback( + (obs) => { + // ELU + const currentElu = performance.eventLoopUtilization(); + const diff = performance.eventLoopUtilization(currentElu, lastElu); + lastElu = currentElu; + obs.observe(eluGauge, diff.utilization); + + // Event loop delay (nanoseconds -> seconds) + obs.observe(eldP50, eld.percentile(50) / 1e9); + obs.observe(eldP99, eld.percentile(99) / 1e9); + obs.observe(eldMax, eld.max / 1e9); + eld.reset(); + + // Heap + const mem = process.memoryUsage(); + obs.observe(heapUsed, mem.heapUsed); + obs.observe(heapTotal, mem.heapTotal); + }, + [eluGauge, eldP50, eldP99, eldMax, heapUsed, heapTotal] + ); +} diff --git a/packages/core/src/v3/otel/tracingSDK.ts b/packages/core/src/v3/otel/tracingSDK.ts index 694212f71b..ab775e2144 100644 --- a/packages/core/src/v3/otel/tracingSDK.ts +++ b/packages/core/src/v3/otel/tracingSDK.ts @@ -4,11 +4,14 @@ import { TraceFlags, TracerProvider, diag, + metrics, } from "@opentelemetry/api"; import { logs } from "@opentelemetry/api-logs"; import { TraceState } from "@opentelemetry/core"; import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; +import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http"; +import { HostMetrics } from "@opentelemetry/host-metrics"; import { registerInstrumentations, type Instrumentation } from "@opentelemetry/instrumentation"; import { detectResources, @@ -24,6 +27,12 @@ import { ReadableLogRecord, SimpleLogRecordProcessor, } from "@opentelemetry/sdk-logs"; +import { + AggregationType, + MeterProvider, + PeriodicExportingMetricReader, + type MetricReader, +} from "@opentelemetry/sdk-metrics"; import { RandomIdGenerator, SpanProcessor } from "@opentelemetry/sdk-trace-base"; import { BatchSpanProcessor, @@ -32,7 +41,6 @@ import { SimpleSpanProcessor, SpanExporter, } from "@opentelemetry/sdk-trace-node"; -import { SemanticResourceAttributes, SEMATTRS_HTTP_URL } from "@opentelemetry/semantic-conventions"; import { VERSION } from "../../version.js"; import { OTEL_ATTRIBUTE_PER_EVENT_COUNT_LIMIT, @@ -47,11 +55,15 @@ import { import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { taskContext } from "../task-context-api.js"; import { + BufferingMetricExporter, TaskContextLogProcessor, + TaskContextMetricExporter, TaskContextSpanProcessor, } from "../taskContext/otelProcessors.js"; import { traceContext } from "../trace-context-api.js"; import { getEnvVar } from "../utils/getEnv.js"; +import { machineId } from "./machineId.js"; +import { startNodejsRuntimeMetrics } from "./nodejsRuntimeMetrics.js"; export type TracingDiagnosticLogLevel = | "none" @@ -64,12 +76,19 @@ export type TracingDiagnosticLogLevel = export type TracingSDKConfig = { url: string; + metricsUrl?: string; forceFlushTimeoutMillis?: number; instrumentations?: Instrumentation[]; exporters?: SpanExporter[]; logExporters?: LogRecordExporter[]; + metricReaders?: MetricReader[]; diagLogLevel?: TracingDiagnosticLogLevel; resource?: Resource; + hostMetrics?: boolean; + /** Enable Node.js runtime metrics (event loop utilization, heap usage, etc.) */ + nodejsRuntimeMetrics?: boolean; + /** Metric instrument name patterns to drop (supports wildcards, e.g. "system.cpu.*") */ + droppedMetrics?: string[]; }; const idGenerator = new RandomIdGenerator(); @@ -78,6 +97,7 @@ export class TracingSDK { private readonly _logProvider: LoggerProvider; private readonly _spanExporter: SpanExporter; private readonly _traceProvider: NodeTracerProvider; + private readonly _meterProvider: MeterProvider; public readonly getLogger: LoggerProvider["getLogger"]; public readonly getTracer: TracerProvider["getTracer"]; @@ -99,13 +119,13 @@ export class TracingSDK { }) .merge( resourceFromAttributes({ - [SemanticResourceAttributes.CLOUD_PROVIDER]: "trigger.dev", - [SemanticResourceAttributes.SERVICE_NAME]: - getEnvVar("TRIGGER_OTEL_SERVICE_NAME") ?? "trigger.dev", + "cloud.provider": "trigger.dev", + "service.name": getEnvVar("TRIGGER_OTEL_SERVICE_NAME") ?? "trigger.dev", [SemanticInternalAttributes.TRIGGER]: true, [SemanticInternalAttributes.CLI_VERSION]: VERSION, [SemanticInternalAttributes.SDK_VERSION]: VERSION, [SemanticInternalAttributes.SDK_LANGUAGE]: "typescript", + [SemanticInternalAttributes.MACHINE_ID]: machineId, }) ) .merge(resourceFromAttributes(envResourceAttributes)) @@ -259,16 +279,80 @@ export class TracingSDK { logs.setGlobalLoggerProvider(loggerProvider); + // Metrics setup + const metricsUrl = + config.metricsUrl ?? + getEnvVar("TRIGGER_OTEL_METRICS_ENDPOINT") ?? + `${config.url}/v1/metrics`; + + const rawMetricExporter = new OTLPMetricExporter({ + url: metricsUrl, + timeoutMillis: config.forceFlushTimeoutMillis, + }); + + const collectionIntervalMs = parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS") ?? "10000" + ); + const exportIntervalMs = parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS") ?? "30000" + ); + + // Chain: PeriodicReader(10s) → TaskContextMetricExporter → BufferingMetricExporter(30s) → OTLP + const bufferingExporter = new BufferingMetricExporter(rawMetricExporter, exportIntervalMs); + const metricExporter = new TaskContextMetricExporter(bufferingExporter); + + const exportTimeoutMillis = parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS") ?? "30000" + ); + + const metricReaders: MetricReader[] = [ + new PeriodicExportingMetricReader({ + exporter: metricExporter, + exportIntervalMillis: Math.max(collectionIntervalMs, exportTimeoutMillis), + exportTimeoutMillis, + }), + ...(config.metricReaders ?? []), + ]; + + const meterProvider = new MeterProvider({ + resource: commonResources, + readers: metricReaders, + views: (config.droppedMetrics ?? []).map((pattern) => ({ + instrumentName: pattern, + aggregation: { type: AggregationType.DROP }, + })), + }); + + this._meterProvider = meterProvider; + metrics.setGlobalMeterProvider(meterProvider); + + if (config.hostMetrics) { + const hostMetrics = new HostMetrics({ meterProvider }); + hostMetrics.start(); + } + + if (config.nodejsRuntimeMetrics) { + startNodejsRuntimeMetrics(meterProvider); + } + this.getLogger = loggerProvider.getLogger.bind(loggerProvider); this.getTracer = traceProvider.getTracer.bind(traceProvider); } public async flush() { - await Promise.all([this._traceProvider.forceFlush(), this._logProvider.forceFlush()]); + await Promise.all([ + this._traceProvider.forceFlush(), + this._logProvider.forceFlush(), + this._meterProvider.forceFlush(), + ]); } public async shutdown() { - await Promise.all([this._traceProvider.shutdown(), this._logProvider.shutdown()]); + await Promise.all([ + this._traceProvider.shutdown(), + this._logProvider.shutdown(), + this._meterProvider.shutdown(), + ]); } } @@ -465,7 +549,7 @@ function isSpanInternalOnly(span: ReadableSpan): boolean { return true; } - const httpUrl = span.attributes[SEMATTRS_HTTP_URL] ?? span.attributes["url.full"]; + const httpUrl = span.attributes["http.url"] ?? span.attributes["url.full"]; const url = safeParseUrl(httpUrl); diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index c635e57445..b58babba71 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -213,6 +213,7 @@ export const WorkerToExecutorMessageCatalog = { FLUSH: { message: z.object({ timeoutInMs: z.number(), + disableContext: z.boolean().optional(), }), callback: z.void(), }, diff --git a/packages/core/src/v3/semanticInternalAttributes.ts b/packages/core/src/v3/semanticInternalAttributes.ts index 4d24235278..3fb20a0649 100644 --- a/packages/core/src/v3/semanticInternalAttributes.ts +++ b/packages/core/src/v3/semanticInternalAttributes.ts @@ -19,6 +19,7 @@ export const SemanticInternalAttributes = { TASK_EXPORT_NAME: "ctx.task.exportName", QUEUE_NAME: "ctx.queue.name", QUEUE_ID: "ctx.queue.id", + MACHINE_ID: "ctx.machine.id", MACHINE_PRESET_NAME: "ctx.machine.name", MACHINE_PRESET_CPU: "ctx.machine.cpu", MACHINE_PRESET_MEMORY: "ctx.machine.memory", @@ -65,4 +66,5 @@ export const SemanticInternalAttributes = { WARM_START: "warm_start", ATTEMPT_EXECUTION_COUNT: "$trigger.executionCount", TASK_EVENT_STORE: "$trigger.taskEventStore", + RUN_TAGS: "ctx.run.tags", }; diff --git a/packages/core/src/v3/taskContext/index.ts b/packages/core/src/v3/taskContext/index.ts index b03b7ff4b6..f76671160a 100644 --- a/packages/core/src/v3/taskContext/index.ts +++ b/packages/core/src/v3/taskContext/index.ts @@ -1,13 +1,14 @@ import { Attributes } from "@opentelemetry/api"; import { ServerBackgroundWorker, TaskRunContext } from "../schemas/index.js"; import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; -import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js"; +import { getGlobal, registerGlobal } from "../utils/globals.js"; import { TaskContext } from "./types.js"; const API_NAME = "task-context"; export class TaskContextAPI { private static _instance?: TaskContextAPI; + private _runDisabled = false; private constructor() {} @@ -23,6 +24,10 @@ export class TaskContextAPI { return this.#getTaskContext() !== undefined; } + get isRunDisabled(): boolean { + return this._runDisabled; + } + get ctx(): TaskRunContext | undefined { return this.#getTaskContext()?.ctx; } @@ -98,11 +103,12 @@ export class TaskContextAPI { } public disable() { - unregisterGlobal(API_NAME); + this._runDisabled = true; } public setGlobalTaskContext(taskContext: TaskContext): boolean { - return registerGlobal(API_NAME, taskContext); + this._runDisabled = false; + return registerGlobal(API_NAME, taskContext, true); } #getTaskContext(): TaskContext | undefined { diff --git a/packages/core/src/v3/taskContext/otelProcessors.ts b/packages/core/src/v3/taskContext/otelProcessors.ts index 16f93d42a9..b08e759724 100644 --- a/packages/core/src/v3/taskContext/otelProcessors.ts +++ b/packages/core/src/v3/taskContext/otelProcessors.ts @@ -1,5 +1,15 @@ -import { Context, trace, Tracer } from "@opentelemetry/api"; +import { Attributes, Context, trace, Tracer } from "@opentelemetry/api"; +import { ExportResult, ExportResultCode } from "@opentelemetry/core"; import { LogRecordProcessor, SdkLogRecord } from "@opentelemetry/sdk-logs"; +import type { + AggregationOption, + AggregationTemporality, + InstrumentType, + MetricData, + PushMetricExporter, + ResourceMetrics, + ScopeMetrics, +} from "@opentelemetry/sdk-metrics"; import { Span, SpanProcessor } from "@opentelemetry/sdk-trace-base"; import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { taskContext } from "../task-context-api.js"; @@ -104,3 +114,194 @@ export class TaskContextLogProcessor implements LogRecordProcessor { return this._innerProcessor.shutdown(); } } + +export class TaskContextMetricExporter implements PushMetricExporter { + selectAggregationTemporality?: (instrumentType: InstrumentType) => AggregationTemporality; + selectAggregation?: (instrumentType: InstrumentType) => AggregationOption; + + constructor(private _innerExporter: PushMetricExporter) { + if (_innerExporter.selectAggregationTemporality) { + this.selectAggregationTemporality = + _innerExporter.selectAggregationTemporality.bind(_innerExporter); + } + if (_innerExporter.selectAggregation) { + this.selectAggregation = _innerExporter.selectAggregation.bind(_innerExporter); + } + } + + export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { + if (!taskContext.ctx) { + // No context at all — drop metrics + resultCallback({ code: ExportResultCode.SUCCESS }); + return; + } + + const ctx = taskContext.ctx; + + let contextAttrs: Attributes; + + if (taskContext.isRunDisabled) { + // Between runs: keep environment/project/org/machine attrs, strip run-specific ones + contextAttrs = { + [SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id, + [SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type, + [SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id, + [SemanticInternalAttributes.PROJECT_ID]: ctx.project.id, + [SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name, + }; + } else { + // During a run: full context attrs + contextAttrs = { + [SemanticInternalAttributes.RUN_ID]: ctx.run.id, + [SemanticInternalAttributes.TASK_SLUG]: ctx.task.id, + [SemanticInternalAttributes.ATTEMPT_NUMBER]: ctx.attempt.number, + [SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id, + [SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id, + [SemanticInternalAttributes.PROJECT_ID]: ctx.project.id, + [SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name, + [SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type, + }; + } + + if (taskContext.worker) { + contextAttrs[SemanticInternalAttributes.WORKER_ID] = taskContext.worker.id; + contextAttrs[SemanticInternalAttributes.WORKER_VERSION] = taskContext.worker.version; + } + + if (!taskContext.isRunDisabled && ctx.run.tags?.length) { + contextAttrs[SemanticInternalAttributes.RUN_TAGS] = ctx.run.tags; + } + + const modified: ResourceMetrics = { + resource: metrics.resource, + scopeMetrics: metrics.scopeMetrics.map((scope) => ({ + ...scope, + metrics: scope.metrics.map( + (metric) => + ({ + ...metric, + dataPoints: metric.dataPoints.map((dp) => ({ + ...dp, + attributes: { ...dp.attributes, ...contextAttrs }, + })), + }) as MetricData + ), + })), + }; + + this._innerExporter.export(modified, resultCallback); + } + + forceFlush(): Promise { + return this._innerExporter.forceFlush(); + } + + shutdown(): Promise { + return this._innerExporter.shutdown(); + } +} + +export class BufferingMetricExporter implements PushMetricExporter { + selectAggregationTemporality?: (instrumentType: InstrumentType) => AggregationTemporality; + selectAggregation?: (instrumentType: InstrumentType) => AggregationOption; + + private _buffer: ResourceMetrics[] = []; + private _lastFlushTime = Date.now(); + + constructor( + private _innerExporter: PushMetricExporter, + private _flushIntervalMs: number + ) { + if (_innerExporter.selectAggregationTemporality) { + this.selectAggregationTemporality = + _innerExporter.selectAggregationTemporality.bind(_innerExporter); + } + if (_innerExporter.selectAggregation) { + this.selectAggregation = _innerExporter.selectAggregation.bind(_innerExporter); + } + } + + export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { + this._buffer.push(metrics); + + const now = Date.now(); + if (now - this._lastFlushTime >= this._flushIntervalMs) { + this._lastFlushTime = now; + const merged = this._mergeBuffer(); + this._innerExporter.export(merged, resultCallback); + } else { + resultCallback({ code: ExportResultCode.SUCCESS }); + } + } + + forceFlush(): Promise { + if (this._buffer.length > 0) { + this._lastFlushTime = Date.now(); + const merged = this._mergeBuffer(); + return new Promise((resolve, reject) => { + this._innerExporter.export(merged, (result) => { + if (result.code === ExportResultCode.SUCCESS) { + resolve(); + } else { + reject(result.error ?? new Error("Export failed")); + } + }); + }).then(() => this._innerExporter.forceFlush()); + } + return this._innerExporter.forceFlush(); + } + + shutdown(): Promise { + return this.forceFlush().then(() => this._innerExporter.shutdown()); + } + + private _mergeBuffer(): ResourceMetrics { + const batch = this._buffer; + this._buffer = []; + + if (batch.length === 1) { + return batch[0]!; + } + + const base = batch[0]!; + + // Merge all scopeMetrics by scope name, then metrics by descriptor name + const scopeMap = new Map }>(); + + for (const rm of batch) { + for (const sm of rm.scopeMetrics) { + const scopeKey = sm.scope.name; + let scopeEntry = scopeMap.get(scopeKey); + if (!scopeEntry) { + scopeEntry = { scope: sm.scope, metricsMap: new Map() }; + scopeMap.set(scopeKey, scopeEntry); + } + + for (const metric of sm.metrics) { + const metricKey = metric.descriptor.name; + const existing = scopeEntry.metricsMap.get(metricKey); + if (existing) { + // Append data points from this collection to the existing metric + scopeEntry.metricsMap.set(metricKey, { + ...existing, + dataPoints: [...existing.dataPoints, ...metric.dataPoints], + } as MetricData); + } else { + scopeEntry.metricsMap.set(metricKey, { + ...metric, + dataPoints: [...metric.dataPoints], + } as MetricData); + } + } + } + } + + return { + resource: base.resource, + scopeMetrics: Array.from(scopeMap.values()).map(({ scope, metricsMap }) => ({ + scope, + metrics: Array.from(metricsMap.values()), + })), + }; + } +} diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 58ee834ac2..4ca301fcdc 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -14,6 +14,7 @@ export { StandardResourceCatalog } from "../resource-catalog/standardResourceCat export { TaskContextSpanProcessor, TaskContextLogProcessor, + TaskContextMetricExporter, } from "../taskContext/otelProcessors.js"; export * from "../usage-api.js"; export { DevUsageManager } from "../usage/devUsageManager.js"; diff --git a/packages/trigger-sdk/src/v3/otel.ts b/packages/trigger-sdk/src/v3/otel.ts index e80c77562e..d02220208e 100644 --- a/packages/trigger-sdk/src/v3/otel.ts +++ b/packages/trigger-sdk/src/v3/otel.ts @@ -1,7 +1,9 @@ +import { metrics } from "@opentelemetry/api"; import { traceContext } from "@trigger.dev/core/v3"; export const otel = { withExternalTrace: (fn: () => T): T => { return traceContext.withExternalTrace(fn); }, + metrics, }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f504496dd1..17052a918f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1101,7 +1101,7 @@ importers: version: 18.3.1 react-email: specifier: ^2.1.1 - version: 2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(eslint@8.31.0) + version: 2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(bufferutil@4.0.9)(eslint@8.31.0) resend: specifier: ^3.2.0 version: 3.2.0 @@ -1698,9 +1698,15 @@ importers: '@opentelemetry/exporter-logs-otlp-http': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0) + '@opentelemetry/exporter-metrics-otlp-http': + specifier: 0.203.0 + version: 0.203.0(@opentelemetry/api@1.9.0) '@opentelemetry/exporter-trace-otlp-http': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0) + '@opentelemetry/host-metrics': + specifier: ^0.36.0 + version: 0.36.0(@opentelemetry/api@1.9.0) '@opentelemetry/instrumentation': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0)(supports-color@10.0.0) @@ -1710,6 +1716,9 @@ importers: '@opentelemetry/sdk-logs': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0) + '@opentelemetry/sdk-metrics': + specifier: 2.0.1 + version: 2.0.1(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-trace-base': specifier: 2.0.1 version: 2.0.1(@opentelemetry/api@1.9.0) @@ -39164,7 +39173,7 @@ snapshots: react: 18.2.0 react-dom: 18.2.0(react@18.2.0) - react-email@2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(eslint@8.31.0): + react-email@2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(bufferutil@4.0.9)(eslint@8.31.0): dependencies: '@babel/parser': 7.24.1 '@radix-ui/colors': 1.0.1 @@ -39201,8 +39210,8 @@ snapshots: react: 18.3.1 react-dom: 18.2.0(react@18.3.1) shelljs: 0.8.5 - socket.io: 4.7.3 - socket.io-client: 4.7.3 + socket.io: 4.7.3(bufferutil@4.0.9) + socket.io-client: 4.7.3(bufferutil@4.0.9) sonner: 1.3.1(react-dom@18.2.0(react@18.3.1))(react@18.3.1) source-map-js: 1.0.2 stacktrace-parser: 0.1.10 @@ -40402,7 +40411,7 @@ snapshots: - supports-color - utf-8-validate - socket.io-client@4.7.3: + socket.io-client@4.7.3(bufferutil@4.0.9): dependencies: '@socket.io/component-emitter': 3.1.0 debug: 4.3.7(supports-color@10.0.0) @@ -40431,7 +40440,7 @@ snapshots: transitivePeerDependencies: - supports-color - socket.io@4.7.3: + socket.io@4.7.3(bufferutil@4.0.9): dependencies: accepts: 1.3.8 base64id: 2.0.0 diff --git a/references/hello-world/src/trigger/metrics.ts b/references/hello-world/src/trigger/metrics.ts new file mode 100644 index 0000000000..fa398186b0 --- /dev/null +++ b/references/hello-world/src/trigger/metrics.ts @@ -0,0 +1,326 @@ +import { batch, logger, otel, task } from "@trigger.dev/sdk"; +import { createHash } from "node:crypto"; +import { setTimeout } from "node:timers/promises"; + +// Custom metrics — instruments are created once at module level +const meter = otel.metrics.getMeter("hello-world"); +const itemsProcessedCounter = meter.createCounter("items.processed", { + description: "Total number of items processed", + unit: "items", +}); +const itemDurationHistogram = meter.createHistogram("item.duration", { + description: "Time spent processing each item", + unit: "ms", +}); +const queueDepthGauge = meter.createUpDownCounter("queue.depth", { + description: "Current simulated queue depth", + unit: "items", +}); + +/** + * Tight computational loop that produces sustained high CPU utilization. + * Uses repeated SHA-256 hashing to keep the CPU busy. + */ +export const cpuIntensive = task({ + id: "cpu-intensive", + run: async ( + { + durationSeconds = 60, + }: { + durationSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting CPU-intensive workload", { durationSeconds }); + + const deadline = Date.now() + durationSeconds * 1000; + let iterations = 0; + let data = Buffer.from("seed-data-for-hashing"); + + while (Date.now() < deadline) { + // Tight hashing loop — ~100ms chunks then yield to event loop + const chunkEnd = Date.now() + 100; + while (Date.now() < chunkEnd) { + data = createHash("sha256").update(data).digest(); + iterations++; + } + // Yield to let metrics collection and heartbeats run + await setTimeout(1); + } + + logger.info("CPU-intensive workload complete", { iterations }); + return { iterations }; + }, +}); + +/** + * Progressively allocates memory in steps, holds it, then releases. + * Produces a staircase-shaped memory usage graph. + */ +export const memoryRamp = task({ + id: "memory-ramp", + run: async ( + { + steps = 6, + stepSizeMb = 50, + stepIntervalSeconds = 5, + holdSeconds = 15, + }: { + steps?: number; + stepSizeMb?: number; + stepIntervalSeconds?: number; + holdSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting memory ramp", { steps, stepSizeMb, stepIntervalSeconds, holdSeconds }); + + const allocations: Buffer[] = []; + + // Ramp up — allocate in steps + for (let i = 0; i < steps; i++) { + const buf = Buffer.alloc(stepSizeMb * 1024 * 1024, 0xff); + allocations.push(buf); + logger.info(`Allocated step ${i + 1}/${steps}`, { + totalAllocatedMb: (i + 1) * stepSizeMb, + }); + await setTimeout(stepIntervalSeconds * 1000); + } + + // Hold at peak + logger.info("Holding at peak memory", { totalMb: steps * stepSizeMb }); + await setTimeout(holdSeconds * 1000); + + // Release + allocations.length = 0; + global.gc?.(); + logger.info("Released all allocations"); + + // Let metrics capture the drop + await setTimeout(10_000); + + logger.info("Memory ramp complete"); + return { peakMb: steps * stepSizeMb }; + }, +}); + +/** + * Alternates between CPU-intensive bursts and idle sleep periods. + * Produces a sawtooth/square-wave CPU utilization pattern. + */ +export const burstyWorkload = task({ + id: "bursty-workload", + run: async ( + { + cycles = 5, + burstSeconds = 5, + idleSeconds = 5, + }: { + cycles?: number; + burstSeconds?: number; + idleSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting bursty workload", { cycles, burstSeconds, idleSeconds }); + + for (let cycle = 0; cycle < cycles; cycle++) { + // Burst phase — hash as fast as possible + logger.info(`Cycle ${cycle + 1}/${cycles}: burst phase`); + const burstDeadline = Date.now() + burstSeconds * 1000; + let data = Buffer.from(`burst-cycle-${cycle}`); + while (Date.now() < burstDeadline) { + const chunkEnd = Date.now() + 100; + while (Date.now() < chunkEnd) { + data = createHash("sha256").update(data).digest(); + } + await setTimeout(1); + } + + // Idle phase + logger.info(`Cycle ${cycle + 1}/${cycles}: idle phase`); + await setTimeout(idleSeconds * 1000); + } + + logger.info("Bursty workload complete", { totalCycles: cycles }); + return { cycles }; + }, +}); + +/** + * Simulates a data processing pipeline with distinct phases: + * 1. Read phase — light CPU, growing memory (buffering data) + * 2. Process phase — high CPU, stable memory (crunching data) + * 3. Write phase — low CPU, memory drops (streaming out results) + * + * Shows clear phase transitions in both CPU and memory graphs. + */ +export const sustainedWorkload = task({ + id: "sustained-workload", + run: async ( + { + readSeconds = 20, + processSeconds = 20, + writeSeconds = 20, + dataSizeMb = 100, + }: { + readSeconds?: number; + processSeconds?: number; + writeSeconds?: number; + dataSizeMb?: number; + }, + { ctx } + ) => { + logger.info("Starting sustained workload — read phase", { readSeconds, dataSizeMb }); + + // Phase 1: Read — gradually accumulate buffers (memory ramp, low CPU) + const chunks: Buffer[] = []; + const chunkCount = 10; + const chunkSize = Math.floor((dataSizeMb * 1024 * 1024) / chunkCount); + const readInterval = (readSeconds * 1000) / chunkCount; + + for (let i = 0; i < chunkCount; i++) { + chunks.push(Buffer.alloc(chunkSize, i)); + logger.info(`Read ${i + 1}/${chunkCount} chunks`); + await setTimeout(readInterval); + } + + // Phase 2: Process — hash all chunks repeatedly (high CPU, stable memory) + logger.info("Entering process phase", { processSeconds }); + const processDeadline = Date.now() + processSeconds * 1000; + let hashCount = 0; + + while (Date.now() < processDeadline) { + const chunkEnd = Date.now() + 100; + while (Date.now() < chunkEnd) { + for (const chunk of chunks) { + createHash("sha256").update(chunk).digest(); + hashCount++; + } + } + await setTimeout(1); + } + + // Phase 3: Write — release memory gradually (low CPU, memory drops) + logger.info("Entering write phase", { writeSeconds }); + const writeInterval = (writeSeconds * 1000) / chunkCount; + + for (let i = chunkCount - 1; i >= 0; i--) { + chunks.pop(); + logger.info(`Wrote and released chunk ${chunkCount - i}/${chunkCount}`); + await setTimeout(writeInterval); + } + + global.gc?.(); + await setTimeout(5000); + + logger.info("Sustained workload complete", { hashCount }); + return { hashCount }; + }, +}); + +/** + * Parent task that fans out multiple child tasks in parallel. + * Useful for seeing per-run breakdowns in metrics queries grouped by run_id. + */ +export const concurrentLoad = task({ + id: "concurrent-load", + run: async ( + { + concurrency = 3, + taskType = "bursty-workload" as "cpu-intensive" | "bursty-workload", + durationSeconds = 30, + }: { + concurrency?: number; + taskType?: "cpu-intensive" | "bursty-workload"; + durationSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting concurrent load", { concurrency, taskType, durationSeconds }); + + const items = Array.from({ length: concurrency }, (_, i) => { + if (taskType === "cpu-intensive") { + return { id: cpuIntensive.id, payload: { durationSeconds } }; + } + return { + id: burstyWorkload.id, + payload: { cycles: 3, burstSeconds: 5, idleSeconds: 5 }, + }; + }); + + const results = await batch.triggerAndWait(items); + + logger.info("All children completed", { + count: results.runs.length, + }); + + return { childRunIds: results.runs.map((r) => r.id) }; + }, +}); + +/** + * Demonstrates custom OTEL metrics: counter, histogram, and up-down counter. + * Simulates processing a queue of items with varying processing times. + */ +export const customMetrics = task({ + id: "custom-metrics", + run: async ( + { + itemCount = 20, + minProcessingMs = 50, + maxProcessingMs = 500, + batchSize = 5, + }: { + itemCount?: number; + minProcessingMs?: number; + maxProcessingMs?: number; + batchSize?: number; + }, + { ctx } + ) => { + logger.info("Starting custom metrics demo", { itemCount, batchSize }); + + // Simulate items arriving in the queue + queueDepthGauge.add(itemCount); + + let totalProcessed = 0; + + for (let i = 0; i < itemCount; i += batchSize) { + const currentBatch = Math.min(batchSize, itemCount - i); + + for (let j = 0; j < currentBatch; j++) { + const processingTime = + minProcessingMs + Math.random() * (maxProcessingMs - minProcessingMs); + + // Simulate work + const start = performance.now(); + let data = Buffer.from(`item-${i + j}`); + const deadline = Date.now() + processingTime; + while (Date.now() < deadline) { + data = createHash("sha256").update(data).digest(); + } + const elapsed = performance.now() - start; + + // Record metrics + itemsProcessedCounter.add(1, { "item.type": j % 2 === 0 ? "even" : "odd" }); + itemDurationHistogram.record(elapsed, { "item.type": j % 2 === 0 ? "even" : "odd" }); + queueDepthGauge.add(-1); + + totalProcessed++; + } + + logger.info(`Processed batch`, { + batchNumber: Math.floor(i / batchSize) + 1, + totalProcessed, + remaining: itemCount - totalProcessed, + }); + + // Brief pause between batches + await setTimeout(1000); + } + + logger.info("Custom metrics demo complete", { totalProcessed }); + return { totalProcessed }; + }, +});