From c1c03c4d17f2ac8b95f461a211347c05eb377e5c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Feb 2026 13:36:40 +0000 Subject: [PATCH 01/13] feat(metrics): add support for otel metrics (WIP) --- apps/webapp/app/env.server.ts | 3 + apps/webapp/app/routes/otel.v1.metrics.ts | 41 +++ .../environmentVariablesRepository.server.ts | 33 +++ apps/webapp/app/v3/otlpExporter.server.ts | 242 ++++++++++++++++++ .../schema/016_create_metrics_v1.sql | 24 ++ internal-packages/clickhouse/src/index.ts | 8 + internal-packages/clickhouse/src/metrics.ts | 33 +++ internal-packages/otlp-importer/src/index.ts | 39 +++ .../cli-v3/src/entryPoints/dev-run-worker.ts | 3 + .../src/entryPoints/managed-run-worker.ts | 3 + packages/core/package.json | 3 + packages/core/src/v3/config.ts | 7 + packages/core/src/v3/otel/machineId.ts | 3 + packages/core/src/v3/otel/tracingSDK.ts | 73 +++++- .../core/src/v3/semanticInternalAttributes.ts | 2 + .../core/src/v3/taskContext/otelProcessors.ts | 81 +++++- packages/core/src/v3/workers/index.ts | 1 + packages/trigger-sdk/src/v3/index.ts | 2 + pnpm-lock.yaml | 21 +- 19 files changed, 608 insertions(+), 14 deletions(-) create mode 100644 apps/webapp/app/routes/otel.v1.metrics.ts create mode 100644 internal-packages/clickhouse/schema/016_create_metrics_v1.sql create mode 100644 internal-packages/clickhouse/src/metrics.ts create mode 100644 packages/core/src/v3/otel/machineId.ts diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 5a321c58b6..25e0107078 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -372,6 +372,7 @@ const EnvironmentSchema = z // Development OTEL environment variables DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(), + DEV_OTEL_METRICS_ENDPOINT: z.string().optional(), // If this is set to 1, then the below variables are used to configure the batch processor for spans and logs DEV_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"), DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"), @@ -382,6 +383,7 @@ const EnvironmentSchema = z DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"), DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"), DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"), + DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(), PROD_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"), PROD_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"), @@ -392,6 +394,7 @@ const EnvironmentSchema = z PROD_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"), PROD_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"), PROD_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"), + PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(), TRIGGER_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"), TRIGGER_OTEL_LOG_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"), diff --git a/apps/webapp/app/routes/otel.v1.metrics.ts b/apps/webapp/app/routes/otel.v1.metrics.ts new file mode 100644 index 0000000000..5529f9310e --- /dev/null +++ b/apps/webapp/app/routes/otel.v1.metrics.ts @@ -0,0 +1,41 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { + ExportMetricsServiceRequest, + ExportMetricsServiceResponse, +} from "@trigger.dev/otlp-importer"; +import { otlpExporter } from "~/v3/otlpExporter.server"; + +export async function action({ request }: ActionFunctionArgs) { + try { + const contentType = request.headers.get("content-type")?.toLowerCase() ?? ""; + + if (contentType.startsWith("application/json")) { + const body = await request.json(); + + const exportResponse = await otlpExporter.exportMetrics( + body as ExportMetricsServiceRequest + ); + + return json(exportResponse, { status: 200 }); + } else if (contentType.startsWith("application/x-protobuf")) { + const buffer = await request.arrayBuffer(); + + const exportRequest = ExportMetricsServiceRequest.decode(new Uint8Array(buffer)); + + const exportResponse = await otlpExporter.exportMetrics(exportRequest); + + return new Response(ExportMetricsServiceResponse.encode(exportResponse).finish(), { + status: 200, + }); + } else { + return new Response( + "Unsupported content type. Must be either application/x-protobuf or application/json", + { status: 400 } + ); + } + } catch (error) { + console.error(error); + + return new Response("Internal Server Error", { status: 500 }); + } +} diff --git a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts index 39d0c863cb..43ad6e6415 100644 --- a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts +++ b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts @@ -956,6 +956,26 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment }, ]; + if (env.DEV_OTEL_METRICS_ENDPOINT) { + result.push({ + key: "TRIGGER_OTEL_METRICS_ENDPOINT", + value: env.DEV_OTEL_METRICS_ENDPOINT, + }); + } + + if (env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS) { + result.push( + { + key: "TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS", + value: env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + }, + { + key: "TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS", + value: env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + } + ); + } + if (env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1") { result = result.concat([ { @@ -1087,6 +1107,19 @@ async function resolveBuiltInProdVariables( ]); } + if (env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS) { + result.push( + { + key: "TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS", + value: env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + }, + { + key: "TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS", + value: env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS, + } + ); + } + if (env.PROD_OTEL_BATCH_PROCESSING_ENABLED === "1") { result = result.concat([ { diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index f7337b3b16..7093a494f5 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -4,10 +4,13 @@ import { AnyValue, ExportLogsServiceRequest, ExportLogsServiceResponse, + ExportMetricsServiceRequest, + ExportMetricsServiceResponse, ExportTraceServiceRequest, ExportTraceServiceResponse, KeyValue, ResourceLogs, + ResourceMetrics, ResourceSpans, SeverityNumber, Span, @@ -15,7 +18,9 @@ import { Span_SpanKind, Status_StatusCode, } from "@trigger.dev/otlp-importer"; +import type { MetricsV1Input } from "@internal/clickhouse"; import { logger } from "~/services/logger.server"; +import { clickhouseClient } from "~/services/clickhouseInstance.server"; import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server"; import { clickhouseEventRepository, @@ -66,6 +71,29 @@ class OTLPExporter { }); } + async exportMetrics( + request: ExportMetricsServiceRequest + ): Promise { + return await startSpan(this._tracer, "exportMetrics", async (span) => { + const rows = this.#filterResourceMetrics(request.resourceMetrics).flatMap( + (resourceMetrics) => { + return convertMetricsToClickhouseRows( + resourceMetrics, + this._spanAttributeValueLengthLimit + ); + } + ); + + span.setAttribute("metric_row_count", rows.length); + + if (rows.length > 0) { + await clickhouseClient.metrics.insert(rows); + } + + return ExportMetricsServiceResponse.create(); + }); + } + async exportLogs(request: ExportLogsServiceRequest): Promise { return await startSpan(this._tracer, "exportLogs", async (span) => { this.#logExportLogsVerbose(request); @@ -202,6 +230,18 @@ class OTLPExporter { return isBoolValue(attribute.value) ? attribute.value.boolValue : false; }); } + + #filterResourceMetrics(resourceMetrics: ResourceMetrics[]): ResourceMetrics[] { + return resourceMetrics.filter((rm) => { + const triggerAttribute = rm.resource?.attributes.find( + (attribute) => attribute.key === SemanticInternalAttributes.TRIGGER + ); + + if (!triggerAttribute) return false; + + return isBoolValue(triggerAttribute.value) ? triggerAttribute.value.boolValue : false; + }); + } } function convertLogsToCreateableEvents( @@ -410,6 +450,208 @@ function convertSpansToCreateableEvents( return { events, taskEventStore }; } +function floorToTenSecondBucket(timeUnixNano: bigint | number): string { + const epochMs = Number(BigInt(timeUnixNano) / BigInt(1_000_000)); + const flooredMs = Math.floor(epochMs / 10_000) * 10_000; + const date = new Date(flooredMs); + // Format as ClickHouse DateTime: YYYY-MM-DD HH:MM:SS + return date.toISOString().replace("T", " ").replace(/\.\d{3}Z$/, ""); +} + +function convertMetricsToClickhouseRows( + resourceMetrics: ResourceMetrics, + spanAttributeValueLengthLimit: number +): MetricsV1Input[] { + const resourceAttributes = resourceMetrics.resource?.attributes ?? []; + const resourceProperties = extractEventProperties(resourceAttributes); + + const organizationId = resourceProperties.organizationId ?? "unknown"; + const projectId = resourceProperties.projectId ?? "unknown"; + const environmentId = resourceProperties.environmentId ?? "unknown"; + const resourceCtx = { + taskSlug: resourceProperties.taskSlug, + runId: resourceProperties.runId, + attemptNumber: resourceProperties.attemptNumber, + machineId: extractStringAttribute(resourceAttributes, SemanticInternalAttributes.MACHINE_ID), + workerId: extractStringAttribute(resourceAttributes, SemanticInternalAttributes.WORKER_ID), + workerVersion: extractStringAttribute( + resourceAttributes, + SemanticInternalAttributes.WORKER_VERSION + ), + }; + + const rows: MetricsV1Input[] = []; + + for (const scopeMetrics of resourceMetrics.scopeMetrics) { + for (const metric of scopeMetrics.metrics) { + const metricName = metric.name; + + // Process gauge data points + if (metric.gauge) { + for (const dp of metric.gauge.dataPoints) { + const value = + dp.asDouble !== 0 ? dp.asDouble : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; + const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); + + rows.push({ + organization_id: organizationId, + project_id: projectId, + environment_id: environmentId, + metric_name: metricName, + metric_type: "gauge", + metric_subject: resolved.machineId ?? "unknown", + bucket_start: floorToTenSecondBucket(dp.timeUnixNano), + count: 0, + sum_value: 0, + max_value: value, + min_value: value, + last_value: value, + attributes: resolved.attributes, + }); + } + } + + // Process sum data points + if (metric.sum) { + for (const dp of metric.sum.dataPoints) { + const value = + dp.asDouble !== 0 ? dp.asDouble : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; + const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); + + rows.push({ + organization_id: organizationId, + project_id: projectId, + environment_id: environmentId, + metric_name: metricName, + metric_type: "sum", + metric_subject: resolved.machineId ?? "unknown", + bucket_start: floorToTenSecondBucket(dp.timeUnixNano), + count: 1, + sum_value: value, + max_value: value, + min_value: value, + last_value: value, + attributes: resolved.attributes, + }); + } + } + + // Process histogram data points + if (metric.histogram) { + for (const dp of metric.histogram.dataPoints) { + const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); + const count = Number(dp.count); + const sum = dp.sum ?? 0; + const max = dp.max ?? 0; + const min = dp.min ?? 0; + + rows.push({ + organization_id: organizationId, + project_id: projectId, + environment_id: environmentId, + metric_name: metricName, + metric_type: "histogram", + metric_subject: resolved.machineId ?? "unknown", + bucket_start: floorToTenSecondBucket(dp.timeUnixNano), + count, + sum_value: sum, + max_value: max, + min_value: min, + last_value: count > 0 ? sum / count : 0, + attributes: resolved.attributes, + }); + } + } + } + } + + return rows; +} + +// Prefixes injected by TaskContextMetricExporter — these are extracted into +// the nested `trigger` key and should not appear as top-level user attributes. +const INTERNAL_METRIC_ATTRIBUTE_PREFIXES = ["ctx.", "worker."]; + +interface ResourceContext { + taskSlug: string | undefined; + runId: string | undefined; + attemptNumber: number | undefined; + machineId: string | undefined; + workerId: string | undefined; + workerVersion: string | undefined; +} + +function resolveDataPointContext( + dpAttributes: KeyValue[], + resourceCtx: ResourceContext +): { + machineId: string | undefined; + attributes: Record; +} { + const runId = + resourceCtx.runId ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.RUN_ID); + const taskSlug = + resourceCtx.taskSlug ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.TASK_SLUG); + const attemptNumber = + resourceCtx.attemptNumber ?? + extractNumberAttribute(dpAttributes, SemanticInternalAttributes.ATTEMPT_NUMBER); + const machineId = + resourceCtx.machineId ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.MACHINE_ID); + const workerId = + resourceCtx.workerId ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.WORKER_ID); + const workerVersion = + resourceCtx.workerVersion ?? + extractStringAttribute(dpAttributes, SemanticInternalAttributes.WORKER_VERSION); + const machineName = extractStringAttribute( + dpAttributes, + SemanticInternalAttributes.MACHINE_PRESET_NAME + ); + const environmentType = extractStringAttribute( + dpAttributes, + SemanticInternalAttributes.ENVIRONMENT_TYPE + ); + + // Build the trigger context object with only defined values + const trigger: Record = {}; + if (runId) trigger.run_id = runId; + if (taskSlug) trigger.task_slug = taskSlug; + if (attemptNumber !== undefined) trigger.attempt_number = attemptNumber; + if (machineId) trigger.machine_id = machineId; + if (machineName) trigger.machine_name = machineName; + if (workerId) trigger.worker_id = workerId; + if (workerVersion) trigger.worker_version = workerVersion; + if (environmentType) trigger.environment_type = environmentType; + + // Build user attributes, filtering out internal ctx/worker keys + const result: Record = {}; + + if (Object.keys(trigger).length > 0) { + result.trigger = trigger; + } + + for (const attr of dpAttributes) { + if (INTERNAL_METRIC_ATTRIBUTE_PREFIXES.some((prefix) => attr.key.startsWith(prefix))) { + continue; + } + + if (isStringValue(attr.value)) { + result[attr.key] = attr.value.stringValue; + } else if (isIntValue(attr.value)) { + result[attr.key] = Number(attr.value.intValue); + } else if (isDoubleValue(attr.value)) { + result[attr.key] = attr.value.doubleValue; + } else if (isBoolValue(attr.value)) { + result[attr.key] = attr.value.boolValue; + } + } + + return { machineId, attributes: result }; +} + function extractEventProperties(attributes: KeyValue[], prefix?: string) { return { metadata: convertSelectedKeyValueItemsToMap(attributes, [SemanticInternalAttributes.METADATA]), diff --git a/internal-packages/clickhouse/schema/016_create_metrics_v1.sql b/internal-packages/clickhouse/schema/016_create_metrics_v1.sql new file mode 100644 index 0000000000..c0314b7fa0 --- /dev/null +++ b/internal-packages/clickhouse/schema/016_create_metrics_v1.sql @@ -0,0 +1,24 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS trigger_dev.metrics_v1 +( + organization_id LowCardinality(String), + project_id LowCardinality(String), + environment_id String, + metric_name LowCardinality(String), + metric_type LowCardinality(String), + metric_subject String, + bucket_start DateTime, + count UInt64 DEFAULT 0, + sum_value Float64 DEFAULT 0, + max_value Float64 DEFAULT 0, + min_value Float64 DEFAULT 0, + last_value Float64 DEFAULT 0, + attributes JSON(max_dynamic_paths=64) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(bucket_start) +ORDER BY (organization_id, project_id, environment_id, metric_name, metric_subject, bucket_start) +TTL bucket_start + INTERVAL 30 DAY; + +-- +goose Down +DROP TABLE IF EXISTS trigger_dev.metrics_v1; diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 47c2f34f2f..1f1b6e2ff0 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -26,12 +26,14 @@ import { getLogDetailQueryBuilderV2, getLogsSearchListQueryBuilder, } from "./taskEvents.js"; +import { insertMetrics } from "./metrics.js"; import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import type { Agent as HttpAgent } from "http"; import type { Agent as HttpsAgent } from "https"; export type * from "./taskRuns.js"; export type * from "./taskEvents.js"; +export type * from "./metrics.js"; export type * from "./client/queryBuilder.js"; // Re-export column constants, indices, and type-safe accessors @@ -214,6 +216,12 @@ export class ClickHouse { }; } + get metrics() { + return { + insert: insertMetrics(this.writer), + }; + } + get taskEventsV2() { return { insert: insertTaskEventsV2(this.writer), diff --git a/internal-packages/clickhouse/src/metrics.ts b/internal-packages/clickhouse/src/metrics.ts new file mode 100644 index 0000000000..4469586d8b --- /dev/null +++ b/internal-packages/clickhouse/src/metrics.ts @@ -0,0 +1,33 @@ +import { z } from "zod"; +import { ClickhouseWriter } from "./client/types.js"; + +export const MetricsV1Input = z.object({ + organization_id: z.string(), + project_id: z.string(), + environment_id: z.string(), + metric_name: z.string(), + metric_type: z.string(), + metric_subject: z.string(), + bucket_start: z.string(), + count: z.number(), + sum_value: z.number(), + max_value: z.number(), + min_value: z.number(), + last_value: z.number(), + attributes: z.unknown(), +}); + +export type MetricsV1Input = z.input; + +export function insertMetrics(ch: ClickhouseWriter) { + return ch.insertUnsafe({ + name: "insertMetrics", + table: "trigger_dev.metrics_v1", + settings: { + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + input_format_json_throw_on_bad_escape_sequence: 0, + input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects: 1, + }, + }); +} diff --git a/internal-packages/otlp-importer/src/index.ts b/internal-packages/otlp-importer/src/index.ts index 7401431a84..3a70776ddf 100644 --- a/internal-packages/otlp-importer/src/index.ts +++ b/internal-packages/otlp-importer/src/index.ts @@ -10,6 +10,12 @@ import { ExportLogsServiceResponse, } from "./generated/opentelemetry/proto/collector/logs/v1/logs_service"; +import { + ExportMetricsPartialSuccess, + ExportMetricsServiceRequest, + ExportMetricsServiceResponse, +} from "./generated/opentelemetry/proto/collector/metrics/v1/metrics_service"; + import type { AnyValue, KeyValue, @@ -33,6 +39,21 @@ import { Status, Status_StatusCode, } from "./generated/opentelemetry/proto/trace/v1/trace"; +import { + ResourceMetrics, + ScopeMetrics, + Metric, + Gauge, + Sum, + Histogram, + ExponentialHistogram, + Summary, + NumberDataPoint, + HistogramDataPoint, + ExponentialHistogramDataPoint, + SummaryDataPoint, + AggregationTemporality, +} from "./generated/opentelemetry/proto/metrics/v1/metrics"; export { LogRecord, @@ -57,3 +78,21 @@ export { export { ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse }; export { ExportLogsPartialSuccess, ExportLogsServiceRequest, ExportLogsServiceResponse }; + +export { ExportMetricsPartialSuccess, ExportMetricsServiceRequest, ExportMetricsServiceResponse }; + +export { + ResourceMetrics, + ScopeMetrics, + Metric, + Gauge, + Sum, + Histogram, + ExponentialHistogram, + Summary, + NumberDataPoint, + HistogramDataPoint, + ExponentialHistogramDataPoint, + SummaryDataPoint, + AggregationTemporality, +}; diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 7cd88ab5a9..bc1208cb65 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -204,12 +204,15 @@ async function doBootstrap() { const tracingSDK = new TracingSDK({ url: env.TRIGGER_OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", + metricsUrl: env.TRIGGER_OTEL_METRICS_ENDPOINT, instrumentations: config.telemetry?.instrumentations ?? config.instrumentations ?? [], exporters: config.telemetry?.exporters ?? [], logExporters: config.telemetry?.logExporters ?? [], + metricReaders: config.telemetry?.metricReaders ?? [], diagLogLevel: (env.TRIGGER_OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", forceFlushTimeoutMillis: 30_000, resource: config.telemetry?.resource, + hostMetrics: true, }); const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index f1512f27f0..95500b450d 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -183,12 +183,15 @@ async function doBootstrap() { const tracingSDK = new TracingSDK({ url: env.TRIGGER_OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", + metricsUrl: env.TRIGGER_OTEL_METRICS_ENDPOINT, instrumentations: config.instrumentations ?? [], diagLogLevel: (env.TRIGGER_OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", forceFlushTimeoutMillis: 30_000, exporters: config.telemetry?.exporters ?? [], logExporters: config.telemetry?.logExporters ?? [], + metricReaders: config.telemetry?.metricReaders ?? [], resource: config.telemetry?.resource, + hostMetrics: true, }); const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); diff --git a/packages/core/package.json b/packages/core/package.json index d73b425f7d..dbc564d22f 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -176,10 +176,13 @@ "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", + "@opentelemetry/exporter-metrics-otlp-http": "0.203.0", + "@opentelemetry/host-metrics": "^0.36.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", + "@opentelemetry/sdk-metrics": "2.0.1", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", diff --git a/packages/core/src/v3/config.ts b/packages/core/src/v3/config.ts index 9c6871a264..0b4d23af00 100644 --- a/packages/core/src/v3/config.ts +++ b/packages/core/src/v3/config.ts @@ -1,5 +1,6 @@ import type { Instrumentation } from "@opentelemetry/instrumentation"; import type { SpanExporter } from "@opentelemetry/sdk-trace-base"; +import type { MetricReader } from "@opentelemetry/sdk-metrics"; import type { BuildExtension } from "./build/extensions.js"; import type { AnyOnFailureHookFunction, @@ -109,6 +110,12 @@ export type TriggerConfig = { */ logExporters?: Array; + /** + * Metric readers for OpenTelemetry. Add custom metric readers to export + * metrics to external services alongside the default Trigger.dev exporter. + */ + metricReaders?: Array; + /** * Resource to use for OpenTelemetry. This is useful if you want to add custom resources to your tasks. * diff --git a/packages/core/src/v3/otel/machineId.ts b/packages/core/src/v3/otel/machineId.ts new file mode 100644 index 0000000000..883b930f37 --- /dev/null +++ b/packages/core/src/v3/otel/machineId.ts @@ -0,0 +1,3 @@ +import { randomUUID } from "crypto"; + +export const machineId = randomUUID(); diff --git a/packages/core/src/v3/otel/tracingSDK.ts b/packages/core/src/v3/otel/tracingSDK.ts index 694212f71b..9aa9ecd9b2 100644 --- a/packages/core/src/v3/otel/tracingSDK.ts +++ b/packages/core/src/v3/otel/tracingSDK.ts @@ -4,11 +4,14 @@ import { TraceFlags, TracerProvider, diag, + metrics, } from "@opentelemetry/api"; import { logs } from "@opentelemetry/api-logs"; import { TraceState } from "@opentelemetry/core"; import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; +import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http"; +import { HostMetrics } from "@opentelemetry/host-metrics"; import { registerInstrumentations, type Instrumentation } from "@opentelemetry/instrumentation"; import { detectResources, @@ -24,6 +27,11 @@ import { ReadableLogRecord, SimpleLogRecordProcessor, } from "@opentelemetry/sdk-logs"; +import { + MeterProvider, + PeriodicExportingMetricReader, + type MetricReader, +} from "@opentelemetry/sdk-metrics"; import { RandomIdGenerator, SpanProcessor } from "@opentelemetry/sdk-trace-base"; import { BatchSpanProcessor, @@ -32,7 +40,6 @@ import { SimpleSpanProcessor, SpanExporter, } from "@opentelemetry/sdk-trace-node"; -import { SemanticResourceAttributes, SEMATTRS_HTTP_URL } from "@opentelemetry/semantic-conventions"; import { VERSION } from "../../version.js"; import { OTEL_ATTRIBUTE_PER_EVENT_COUNT_LIMIT, @@ -48,10 +55,12 @@ import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { taskContext } from "../task-context-api.js"; import { TaskContextLogProcessor, + TaskContextMetricExporter, TaskContextSpanProcessor, } from "../taskContext/otelProcessors.js"; import { traceContext } from "../trace-context-api.js"; import { getEnvVar } from "../utils/getEnv.js"; +import { machineId } from "./machineId.js"; export type TracingDiagnosticLogLevel = | "none" @@ -64,12 +73,15 @@ export type TracingDiagnosticLogLevel = export type TracingSDKConfig = { url: string; + metricsUrl?: string; forceFlushTimeoutMillis?: number; instrumentations?: Instrumentation[]; exporters?: SpanExporter[]; logExporters?: LogRecordExporter[]; + metricReaders?: MetricReader[]; diagLogLevel?: TracingDiagnosticLogLevel; resource?: Resource; + hostMetrics?: boolean; }; const idGenerator = new RandomIdGenerator(); @@ -78,6 +90,7 @@ export class TracingSDK { private readonly _logProvider: LoggerProvider; private readonly _spanExporter: SpanExporter; private readonly _traceProvider: NodeTracerProvider; + private readonly _meterProvider: MeterProvider; public readonly getLogger: LoggerProvider["getLogger"]; public readonly getTracer: TracerProvider["getTracer"]; @@ -99,13 +112,13 @@ export class TracingSDK { }) .merge( resourceFromAttributes({ - [SemanticResourceAttributes.CLOUD_PROVIDER]: "trigger.dev", - [SemanticResourceAttributes.SERVICE_NAME]: - getEnvVar("TRIGGER_OTEL_SERVICE_NAME") ?? "trigger.dev", + "cloud.provider": "trigger.dev", + "service.name": getEnvVar("TRIGGER_OTEL_SERVICE_NAME") ?? "trigger.dev", [SemanticInternalAttributes.TRIGGER]: true, [SemanticInternalAttributes.CLI_VERSION]: VERSION, [SemanticInternalAttributes.SDK_VERSION]: VERSION, [SemanticInternalAttributes.SDK_LANGUAGE]: "typescript", + [SemanticInternalAttributes.MACHINE_ID]: machineId, }) ) .merge(resourceFromAttributes(envResourceAttributes)) @@ -259,16 +272,62 @@ export class TracingSDK { logs.setGlobalLoggerProvider(loggerProvider); + // Metrics setup + const metricsUrl = + config.metricsUrl ?? + getEnvVar("TRIGGER_OTEL_METRICS_ENDPOINT") ?? + `${config.url}/v1/metrics`; + + const rawMetricExporter = new OTLPMetricExporter({ + url: metricsUrl, + timeoutMillis: config.forceFlushTimeoutMillis, + }); + const metricExporter = new TaskContextMetricExporter(rawMetricExporter); + + const metricReaders: MetricReader[] = [ + new PeriodicExportingMetricReader({ + exporter: metricExporter, + exportIntervalMillis: parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS") ?? "60000" + ), + exportTimeoutMillis: parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS") ?? "30000" + ), + }), + ...(config.metricReaders ?? []), + ]; + + const meterProvider = new MeterProvider({ + resource: commonResources, + readers: metricReaders, + }); + + this._meterProvider = meterProvider; + metrics.setGlobalMeterProvider(meterProvider); + + if (config.hostMetrics) { + const hostMetrics = new HostMetrics({ meterProvider }); + hostMetrics.start(); + } + this.getLogger = loggerProvider.getLogger.bind(loggerProvider); this.getTracer = traceProvider.getTracer.bind(traceProvider); } public async flush() { - await Promise.all([this._traceProvider.forceFlush(), this._logProvider.forceFlush()]); + await Promise.all([ + this._traceProvider.forceFlush(), + this._logProvider.forceFlush(), + this._meterProvider.forceFlush(), + ]); } public async shutdown() { - await Promise.all([this._traceProvider.shutdown(), this._logProvider.shutdown()]); + await Promise.all([ + this._traceProvider.shutdown(), + this._logProvider.shutdown(), + this._meterProvider.shutdown(), + ]); } } @@ -465,7 +524,7 @@ function isSpanInternalOnly(span: ReadableSpan): boolean { return true; } - const httpUrl = span.attributes[SEMATTRS_HTTP_URL] ?? span.attributes["url.full"]; + const httpUrl = span.attributes["http.url"] ?? span.attributes["url.full"]; const url = safeParseUrl(httpUrl); diff --git a/packages/core/src/v3/semanticInternalAttributes.ts b/packages/core/src/v3/semanticInternalAttributes.ts index 4d24235278..3fb20a0649 100644 --- a/packages/core/src/v3/semanticInternalAttributes.ts +++ b/packages/core/src/v3/semanticInternalAttributes.ts @@ -19,6 +19,7 @@ export const SemanticInternalAttributes = { TASK_EXPORT_NAME: "ctx.task.exportName", QUEUE_NAME: "ctx.queue.name", QUEUE_ID: "ctx.queue.id", + MACHINE_ID: "ctx.machine.id", MACHINE_PRESET_NAME: "ctx.machine.name", MACHINE_PRESET_CPU: "ctx.machine.cpu", MACHINE_PRESET_MEMORY: "ctx.machine.memory", @@ -65,4 +66,5 @@ export const SemanticInternalAttributes = { WARM_START: "warm_start", ATTEMPT_EXECUTION_COUNT: "$trigger.executionCount", TASK_EVENT_STORE: "$trigger.taskEventStore", + RUN_TAGS: "ctx.run.tags", }; diff --git a/packages/core/src/v3/taskContext/otelProcessors.ts b/packages/core/src/v3/taskContext/otelProcessors.ts index 16f93d42a9..0f6340c60d 100644 --- a/packages/core/src/v3/taskContext/otelProcessors.ts +++ b/packages/core/src/v3/taskContext/otelProcessors.ts @@ -1,5 +1,14 @@ -import { Context, trace, Tracer } from "@opentelemetry/api"; +import { Attributes, Context, trace, Tracer } from "@opentelemetry/api"; +import { ExportResult } from "@opentelemetry/core"; import { LogRecordProcessor, SdkLogRecord } from "@opentelemetry/sdk-logs"; +import type { + AggregationOption, + AggregationTemporality, + InstrumentType, + MetricData, + PushMetricExporter, + ResourceMetrics, +} from "@opentelemetry/sdk-metrics"; import { Span, SpanProcessor } from "@opentelemetry/sdk-trace-base"; import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { taskContext } from "../task-context-api.js"; @@ -104,3 +113,73 @@ export class TaskContextLogProcessor implements LogRecordProcessor { return this._innerProcessor.shutdown(); } } + +export class TaskContextMetricExporter implements PushMetricExporter { + selectAggregationTemporality?: (instrumentType: InstrumentType) => AggregationTemporality; + selectAggregation?: (instrumentType: InstrumentType) => AggregationOption; + + constructor(private _innerExporter: PushMetricExporter) { + if (_innerExporter.selectAggregationTemporality) { + this.selectAggregationTemporality = + _innerExporter.selectAggregationTemporality.bind(_innerExporter); + } + if (_innerExporter.selectAggregation) { + this.selectAggregation = _innerExporter.selectAggregation.bind(_innerExporter); + } + } + + export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { + if (!taskContext.ctx) { + this._innerExporter.export(metrics, resultCallback); + return; + } + + const ctx = taskContext.ctx; + const contextAttrs: Attributes = { + [SemanticInternalAttributes.RUN_ID]: ctx.run.id, + [SemanticInternalAttributes.TASK_SLUG]: ctx.task.id, + [SemanticInternalAttributes.ATTEMPT_NUMBER]: ctx.attempt.number, + [SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id, + [SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id, + [SemanticInternalAttributes.PROJECT_ID]: ctx.project.id, + [SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name, + [SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type, + }; + + if (taskContext.worker) { + contextAttrs[SemanticInternalAttributes.WORKER_ID] = taskContext.worker.id; + contextAttrs[SemanticInternalAttributes.WORKER_VERSION] = taskContext.worker.version; + } + + if (ctx.run.tags?.length) { + contextAttrs[SemanticInternalAttributes.RUN_TAGS] = ctx.run.tags; + } + + const modified: ResourceMetrics = { + resource: metrics.resource, + scopeMetrics: metrics.scopeMetrics.map((scope) => ({ + ...scope, + metrics: scope.metrics.map( + (metric) => + ({ + ...metric, + dataPoints: metric.dataPoints.map((dp) => ({ + ...dp, + attributes: { ...dp.attributes, ...contextAttrs }, + })), + }) as MetricData + ), + })), + }; + + this._innerExporter.export(modified, resultCallback); + } + + forceFlush(): Promise { + return this._innerExporter.forceFlush(); + } + + shutdown(): Promise { + return this._innerExporter.shutdown(); + } +} diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 58ee834ac2..4ca301fcdc 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -14,6 +14,7 @@ export { StandardResourceCatalog } from "../resource-catalog/standardResourceCat export { TaskContextSpanProcessor, TaskContextLogProcessor, + TaskContextMetricExporter, } from "../taskContext/otelProcessors.js"; export * from "../usage-api.js"; export { DevUsageManager } from "../usage/devUsageManager.js"; diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index b2d6247699..97a2271f36 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -58,3 +58,5 @@ export * as queues from "./queues.js"; export type { ImportEnvironmentVariablesParams } from "./envvars.js"; export { configure, auth } from "./auth.js"; + +export { metrics } from "@opentelemetry/api"; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f504496dd1..17052a918f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1101,7 +1101,7 @@ importers: version: 18.3.1 react-email: specifier: ^2.1.1 - version: 2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(eslint@8.31.0) + version: 2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(bufferutil@4.0.9)(eslint@8.31.0) resend: specifier: ^3.2.0 version: 3.2.0 @@ -1698,9 +1698,15 @@ importers: '@opentelemetry/exporter-logs-otlp-http': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0) + '@opentelemetry/exporter-metrics-otlp-http': + specifier: 0.203.0 + version: 0.203.0(@opentelemetry/api@1.9.0) '@opentelemetry/exporter-trace-otlp-http': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0) + '@opentelemetry/host-metrics': + specifier: ^0.36.0 + version: 0.36.0(@opentelemetry/api@1.9.0) '@opentelemetry/instrumentation': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0)(supports-color@10.0.0) @@ -1710,6 +1716,9 @@ importers: '@opentelemetry/sdk-logs': specifier: 0.203.0 version: 0.203.0(@opentelemetry/api@1.9.0) + '@opentelemetry/sdk-metrics': + specifier: 2.0.1 + version: 2.0.1(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-trace-base': specifier: 2.0.1 version: 2.0.1(@opentelemetry/api@1.9.0) @@ -39164,7 +39173,7 @@ snapshots: react: 18.2.0 react-dom: 18.2.0(react@18.2.0) - react-email@2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(eslint@8.31.0): + react-email@2.1.2(@opentelemetry/api@1.9.0)(@swc/helpers@0.5.15)(bufferutil@4.0.9)(eslint@8.31.0): dependencies: '@babel/parser': 7.24.1 '@radix-ui/colors': 1.0.1 @@ -39201,8 +39210,8 @@ snapshots: react: 18.3.1 react-dom: 18.2.0(react@18.3.1) shelljs: 0.8.5 - socket.io: 4.7.3 - socket.io-client: 4.7.3 + socket.io: 4.7.3(bufferutil@4.0.9) + socket.io-client: 4.7.3(bufferutil@4.0.9) sonner: 1.3.1(react-dom@18.2.0(react@18.3.1))(react@18.3.1) source-map-js: 1.0.2 stacktrace-parser: 0.1.10 @@ -40402,7 +40411,7 @@ snapshots: - supports-color - utf-8-validate - socket.io-client@4.7.3: + socket.io-client@4.7.3(bufferutil@4.0.9): dependencies: '@socket.io/component-emitter': 3.1.0 debug: 4.3.7(supports-color@10.0.0) @@ -40431,7 +40440,7 @@ snapshots: transitivePeerDependencies: - supports-color - socket.io@4.7.3: + socket.io@4.7.3(bufferutil@4.0.9): dependencies: accepts: 1.3.8 base64id: 2.0.0 From c186ee67f11e4ee6f3e91219fa7f777dfa85ac44 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Feb 2026 13:46:30 +0000 Subject: [PATCH 02/13] stop hardcoding the triggered_at column --- .../app/services/queryService.server.ts | 23 ++- apps/webapp/app/v3/querySchemas.ts | 168 +++++++++++++++++- 2 files changed, 182 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/services/queryService.server.ts b/apps/webapp/app/services/queryService.server.ts index 676be9f2f0..6cd2af03b1 100644 --- a/apps/webapp/app/services/queryService.server.ts +++ b/apps/webapp/app/services/queryService.server.ts @@ -152,7 +152,14 @@ export async function executeQuery( return { success: false, error: new QueryError(errorMessage, { query: options.query }) }; } - // Build time filter fallback for triggered_at column + // Detect which table the query targets to determine the time column + // Each table schema declares its primary time column via timeConstraint + const matchedSchema = querySchemas.find((s) => + new RegExp(`\\bFROM\\s+${s.name}\\b`, "i").test(options.query) + ); + const timeColumn = matchedSchema?.timeConstraint ?? "triggered_at"; + + // Build time filter fallback for the table's time column const defaultPeriod = await getDefaultPeriod(organizationId); const timeFilter = timeFilters({ period: period ?? undefined, @@ -173,15 +180,15 @@ export async function executeQuery( } // Build the fallback WHERE condition based on what the user specified - let triggeredAtFallback: WhereClauseCondition; + let timeFallback: WhereClauseCondition; if (timeFilter.from && timeFilter.to) { - triggeredAtFallback = { op: "between", low: timeFilter.from, high: timeFilter.to }; + timeFallback = { op: "between", low: timeFilter.from, high: timeFilter.to }; } else if (timeFilter.from) { - triggeredAtFallback = { op: "gte", value: timeFilter.from }; + timeFallback = { op: "gte", value: timeFilter.from }; } else if (timeFilter.to) { - triggeredAtFallback = { op: "lte", value: timeFilter.to }; + timeFallback = { op: "lte", value: timeFilter.to }; } else { - triggeredAtFallback = { op: "gte", value: requestedFromDate! }; + timeFallback = { op: "gte", value: requestedFromDate! }; } const maxQueryPeriod = await getLimit(organizationId, "queryPeriodDays", 30); @@ -196,7 +203,7 @@ export async function executeQuery( project_id: scope === "project" || scope === "environment" ? { op: "eq", value: projectId } : undefined, environment_id: scope === "environment" ? { op: "eq", value: environmentId } : undefined, - triggered_at: { op: "gte", value: maxQueryPeriodDate }, + [timeColumn]: { op: "gte", value: maxQueryPeriodDate }, // Optional filters for tasks and queues task_identifier: taskIdentifiers && taskIdentifiers.length > 0 @@ -238,7 +245,7 @@ export async function executeQuery( enforcedWhereClause, fieldMappings, whereClauseFallback: { - triggered_at: triggeredAtFallback, + [timeColumn]: timeFallback, }, timeRange, clickhouseSettings: { diff --git a/apps/webapp/app/v3/querySchemas.ts b/apps/webapp/app/v3/querySchemas.ts index 33a75a4fe1..4c41556317 100644 --- a/apps/webapp/app/v3/querySchemas.ts +++ b/apps/webapp/app/v3/querySchemas.ts @@ -434,10 +434,176 @@ export const runsSchema: TableSchema = { }, }; +/** + * Schema definition for the metrics table (trigger_dev.metrics_v1) + */ +export const metricsSchema: TableSchema = { + name: "metrics", + clickhouseName: "trigger_dev.metrics_v1", + description: "Host and runtime metrics collected during task execution", + timeConstraint: "bucket_start", + tenantColumns: { + organizationId: "organization_id", + projectId: "project_id", + environmentId: "environment_id", + }, + columns: { + environment: { + name: "environment", + clickhouseName: "environment_id", + ...column("String", { description: "The environment slug", example: "prod" }), + fieldMapping: "environment", + customRenderType: "environment", + }, + project: { + name: "project", + clickhouseName: "project_id", + ...column("String", { + description: "The project reference, they always start with `proj_`.", + example: "proj_howcnaxbfxdmwmxazktx", + }), + fieldMapping: "project", + customRenderType: "project", + }, + metric_name: { + name: "metric_name", + ...column("LowCardinality(String)", { + description: "The name of the metric (e.g. process.cpu.utilization, system.memory.usage)", + example: "process.cpu.utilization", + coreColumn: true, + }), + }, + metric_type: { + name: "metric_type", + ...column("LowCardinality(String)", { + description: "The type of metric", + allowedValues: ["gauge", "sum", "histogram"], + example: "gauge", + }), + }, + machine_id: { + name: "machine_id", + clickhouseName: "metric_subject", + ...column("String", { + description: "The machine ID that produced this metric", + example: "machine-abc123", + }), + }, + bucket_start: { + name: "bucket_start", + ...column("DateTime", { + description: "The start of the 10-second aggregation bucket", + example: "2024-01-15 09:30:00", + coreColumn: true, + }), + }, + count: { + name: "count", + ...column("UInt64", { + description: "Number of data points in this bucket", + example: "6", + }), + }, + sum_value: { + name: "sum_value", + ...column("Float64", { + description: "Sum of values in this bucket", + example: "0.45", + }), + }, + max_value: { + name: "max_value", + ...column("Float64", { + description: "Maximum value in this bucket", + example: "0.85", + coreColumn: true, + }), + }, + min_value: { + name: "min_value", + ...column("Float64", { + description: "Minimum value in this bucket", + example: "0.12", + }), + }, + last_value: { + name: "last_value", + ...column("Float64", { + description: "Last recorded value in this bucket", + example: "0.42", + coreColumn: true, + }), + }, + + // Trigger context columns (from attributes.trigger.* JSON subpaths) + run_id: { + name: "run_id", + ...column("String", { + description: "The run ID associated with this metric", + customRenderType: "runId", + example: "run_cm1a2b3c4d5e6f7g8h9i", + }), + expression: "attributes.trigger.run_id", + }, + task_identifier: { + name: "task_identifier", + ...column("String", { + description: "Task identifier/slug", + example: "my-background-task", + coreColumn: true, + }), + expression: "attributes.trigger.task_slug", + }, + attempt_number: { + name: "attempt_number", + ...column("String", { + description: "The attempt number for this metric", + example: "1", + }), + expression: "attributes.trigger.attempt_number", + }, + machine_name: { + name: "machine_name", + ...column("String", { + description: "The machine preset used for execution", + allowedValues: [...MACHINE_PRESETS], + example: "small-1x", + }), + expression: "attributes.trigger.machine_name", + }, + environment_type: { + name: "environment_type", + ...column("String", { + description: "Environment type", + allowedValues: [...ENVIRONMENT_TYPES], + customRenderType: "environmentType", + example: "PRODUCTION", + }), + expression: "attributes.trigger.environment_type", + }, + worker_id: { + name: "worker_id", + ...column("String", { + description: "The worker ID that produced this metric", + example: "worker-abc123", + }), + expression: "attributes.trigger.worker_id", + }, + worker_version: { + name: "worker_version", + ...column("String", { + description: "The worker version that produced this metric", + example: "20240115.1", + }), + expression: "attributes.trigger.worker_version", + }, + }, +}; + /** * All available schemas for the query editor */ -export const querySchemas: TableSchema[] = [runsSchema]; +export const querySchemas: TableSchema[] = [runsSchema, metricsSchema]; /** * Default query for the query editor From b3895ba90b0bab7474d741af2c85805554ef0b8a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Feb 2026 16:24:00 +0000 Subject: [PATCH 03/13] better machine ID and don't send metrics in between runs in a warm process --- packages/cli-v3/src/dev/taskRunProcessPool.ts | 2 ++ packages/cli-v3/src/entryPoints/dev-run-worker.ts | 7 ++++++- packages/cli-v3/src/entryPoints/managed-run-worker.ts | 5 ++++- .../src/entryPoints/managed/taskRunProcessProvider.ts | 4 +++- packages/cli-v3/src/executions/taskRunProcess.ts | 6 +++--- packages/core/src/v3/index.ts | 1 + packages/core/src/v3/otel/machineId.ts | 5 +++-- packages/core/src/v3/otel/tracingSDK.ts | 7 +++++++ packages/core/src/v3/schemas/messages.ts | 1 + packages/core/src/v3/taskContext/otelProcessors.ts | 5 +++-- 10 files changed, 33 insertions(+), 10 deletions(-) diff --git a/packages/cli-v3/src/dev/taskRunProcessPool.ts b/packages/cli-v3/src/dev/taskRunProcessPool.ts index 1d0640e52d..a20755e1ce 100644 --- a/packages/cli-v3/src/dev/taskRunProcessPool.ts +++ b/packages/cli-v3/src/dev/taskRunProcessPool.ts @@ -2,6 +2,7 @@ import { MachinePresetResources, ServerBackgroundWorker, WorkerManifest, + generateFriendlyId, } from "@trigger.dev/core/v3"; import { TaskRunProcess } from "../executions/taskRunProcess.js"; import { logger } from "../utilities/logger.js"; @@ -106,6 +107,7 @@ export class TaskRunProcessPool { env: { ...this.options.env, ...env, + TRIGGER_MACHINE_ID: generateFriendlyId("machine"), }, serverWorker, machineResources, diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index bc1208cb65..75aaf7ab98 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -213,6 +213,8 @@ async function doBootstrap() { forceFlushTimeoutMillis: 30_000, resource: config.telemetry?.resource, hostMetrics: true, + // Drop per-CPU per-state system.cpu metrics in dev to reduce noise + droppedMetrics: ["system.cpu.*"], }); const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); @@ -622,8 +624,11 @@ const zodIpc = new ZodIpcConnection({ } await flushAll(timeoutInMs); }, - FLUSH: async ({ timeoutInMs }) => { + FLUSH: async ({ timeoutInMs, disableContext }) => { await flushAll(timeoutInMs); + if (disableContext) { + taskContext.disable(); + } }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 95500b450d..7db9571cef 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -610,8 +610,11 @@ const zodIpc = new ZodIpcConnection({ } await flushAll(timeoutInMs); }, - FLUSH: async ({ timeoutInMs }) => { + FLUSH: async ({ timeoutInMs, disableContext }) => { await flushAll(timeoutInMs); + if (disableContext) { + taskContext.disable(); + } }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); diff --git a/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts b/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts index 381dca908a..101d1827ce 100644 --- a/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts +++ b/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts @@ -1,4 +1,4 @@ -import { WorkerManifest } from "@trigger.dev/core/v3"; +import { WorkerManifest, generateFriendlyId } from "@trigger.dev/core/v3"; import { TaskRunProcess } from "../../executions/taskRunProcess.js"; import { RunnerEnv } from "./env.js"; import { RunLogger, SendDebugLogOptions } from "./logger.js"; @@ -22,6 +22,7 @@ export class TaskRunProcessProvider { private readonly logger: RunLogger; private readonly processKeepAliveEnabled: boolean; private readonly processKeepAliveMaxExecutionCount: number; + private readonly machineId = generateFriendlyId("machine"); // Process keep-alive state private persistentProcess: TaskRunProcess | null = null; @@ -269,6 +270,7 @@ export class TaskRunProcessProvider { return { ...taskRunEnv, ...this.env.gatherProcessEnv(), + TRIGGER_MACHINE_ID: this.machineId, HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000), }; } diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index 1e274ba02f..a329956c0d 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -126,7 +126,7 @@ export class TaskRunProcess { return; } - await tryCatch(this.#flush()); + await tryCatch(this.#flush({ disableContext: !kill })); if (kill) { await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs); @@ -240,10 +240,10 @@ export class TaskRunProcess { return this; } - async #flush(timeoutInMs: number = 5_000) { + async #flush({ timeoutInMs = 5_000, disableContext = false } = {}) { logger.debug("flushing task run process", { pid: this.pid }); - await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000); + await this._ipc?.sendWithAck("FLUSH", { timeoutInMs, disableContext }, timeoutInMs + 1_000); } async #cancel(timeoutInMs: number = 30_000) { diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index f4c114c5f9..b714d8cb93 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -49,6 +49,7 @@ export { NULL_SENTINEL, } from "./utils/flattenAttributes.js"; export { omit } from "./utils/omit.js"; +export { generateFriendlyId, fromFriendlyId } from "./isomorphic/friendlyId.js"; export { calculateNextRetryDelay, calculateResetAt, diff --git a/packages/core/src/v3/otel/machineId.ts b/packages/core/src/v3/otel/machineId.ts index 883b930f37..79f7bc4ba2 100644 --- a/packages/core/src/v3/otel/machineId.ts +++ b/packages/core/src/v3/otel/machineId.ts @@ -1,3 +1,4 @@ -import { randomUUID } from "crypto"; +import { generateFriendlyId } from "../isomorphic/friendlyId.js"; +import { getEnvVar } from "../utils/getEnv.js"; -export const machineId = randomUUID(); +export const machineId = getEnvVar("TRIGGER_MACHINE_ID") ?? generateFriendlyId("machine"); diff --git a/packages/core/src/v3/otel/tracingSDK.ts b/packages/core/src/v3/otel/tracingSDK.ts index 9aa9ecd9b2..1a9ca4c224 100644 --- a/packages/core/src/v3/otel/tracingSDK.ts +++ b/packages/core/src/v3/otel/tracingSDK.ts @@ -28,6 +28,7 @@ import { SimpleLogRecordProcessor, } from "@opentelemetry/sdk-logs"; import { + AggregationType, MeterProvider, PeriodicExportingMetricReader, type MetricReader, @@ -82,6 +83,8 @@ export type TracingSDKConfig = { diagLogLevel?: TracingDiagnosticLogLevel; resource?: Resource; hostMetrics?: boolean; + /** Metric instrument name patterns to drop (supports wildcards, e.g. "system.cpu.*") */ + droppedMetrics?: string[]; }; const idGenerator = new RandomIdGenerator(); @@ -300,6 +303,10 @@ export class TracingSDK { const meterProvider = new MeterProvider({ resource: commonResources, readers: metricReaders, + views: (config.droppedMetrics ?? []).map((pattern) => ({ + instrumentName: pattern, + aggregation: { type: AggregationType.DROP }, + })), }); this._meterProvider = meterProvider; diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index c635e57445..b58babba71 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -213,6 +213,7 @@ export const WorkerToExecutorMessageCatalog = { FLUSH: { message: z.object({ timeoutInMs: z.number(), + disableContext: z.boolean().optional(), }), callback: z.void(), }, diff --git a/packages/core/src/v3/taskContext/otelProcessors.ts b/packages/core/src/v3/taskContext/otelProcessors.ts index 0f6340c60d..9d051f615b 100644 --- a/packages/core/src/v3/taskContext/otelProcessors.ts +++ b/packages/core/src/v3/taskContext/otelProcessors.ts @@ -1,5 +1,5 @@ import { Attributes, Context, trace, Tracer } from "@opentelemetry/api"; -import { ExportResult } from "@opentelemetry/core"; +import { ExportResult, ExportResultCode } from "@opentelemetry/core"; import { LogRecordProcessor, SdkLogRecord } from "@opentelemetry/sdk-logs"; import type { AggregationOption, @@ -130,7 +130,8 @@ export class TaskContextMetricExporter implements PushMetricExporter { export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { if (!taskContext.ctx) { - this._innerExporter.export(metrics, resultCallback); + // No active run — drop metrics (between-run noise) + resultCallback({ code: ExportResultCode.SUCCESS }); return; } From 8498803ad5f1740e1cae9207b39defc6c7cf9c38 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Feb 2026 16:33:29 +0000 Subject: [PATCH 04/13] filter out all system metrics from dev runs --- packages/cli-v3/src/entryPoints/dev-run-worker.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 75aaf7ab98..a7f80b7a99 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -213,8 +213,8 @@ async function doBootstrap() { forceFlushTimeoutMillis: 30_000, resource: config.telemetry?.resource, hostMetrics: true, - // Drop per-CPU per-state system.cpu metrics in dev to reduce noise - droppedMetrics: ["system.cpu.*"], + // Drop all system metrics from dev metrics export + droppedMetrics: ["system.*"], }); const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); From b4407c7b6f9b66ca17e1572e5b9ef807dac77d51 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Feb 2026 17:16:11 +0000 Subject: [PATCH 05/13] Integrate metrics with the AI query service --- .../AITabContent.tsx | 2 + .../app/v3/services/aiQueryService.server.ts | 62 ++++++++++++++++--- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx index 0f3d4042bf..269747ae86 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.query/AITabContent.tsx @@ -30,6 +30,8 @@ export function AITabContent({ "Top 50 most expensive runs this week", "Average execution duration by task this week", "Run counts by tag in the past 7 days", + "CPU utilization over time by task", + "Peak memory usage per run", ]; return ( diff --git a/apps/webapp/app/v3/services/aiQueryService.server.ts b/apps/webapp/app/v3/services/aiQueryService.server.ts index d397215c8d..5d75009baa 100644 --- a/apps/webapp/app/v3/services/aiQueryService.server.ts +++ b/apps/webapp/app/v3/services/aiQueryService.server.ts @@ -55,7 +55,7 @@ export class AIQueryService { constructor( private readonly tableSchema: TableSchema[], - private readonly model: LanguageModelV1 = openai("gpt-4o-mini") + private readonly model: LanguageModelV1 = openai("codex-mini-latest") ) {} /** @@ -65,7 +65,7 @@ export class AIQueryService { private buildSetTimeFilterTool() { return tool({ description: - "Set the time filter for the query page UI instead of adding triggered_at conditions to the query. ALWAYS use this tool when the user wants to filter by time (e.g., 'last 7 days', 'past hour', 'yesterday'). The UI will apply this filter automatically. Do NOT add triggered_at to the WHERE clause - use this tool instead.", + "Set the time filter for the query page UI instead of adding time conditions to the query. ALWAYS use this tool when the user wants to filter by time (e.g., 'last 7 days', 'past hour', 'yesterday'). The UI will apply this filter automatically using the table's time column (triggered_at for runs, bucket_start for metrics). Do NOT add triggered_at or bucket_start to the WHERE clause for time filtering - use this tool instead.", parameters: z.object({ period: z .string() @@ -366,7 +366,7 @@ export class AIQueryService { * Build the system prompt for the AI */ private buildSystemPrompt(schemaDescription: string): string { - return `You are an expert SQL assistant that generates TSQL queries for a task run analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. + return `You are an expert SQL assistant that generates TSQL queries for a task analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. ## Your Task Convert natural language requests into valid TSQL SELECT queries. Always validate your queries using the validateTSQLQuery tool before returning them. @@ -374,6 +374,13 @@ Convert natural language requests into valid TSQL SELECT queries. Always validat ## Available Schema ${schemaDescription} +## Choosing the Right Table + +- **runs** — Task run records (status, timing, cost, output, etc.). Use for questions about runs, tasks, failures, durations, costs, queues. +- **metrics** — Host and runtime metrics collected during task execution (CPU, memory). Use for questions about resource usage, CPU utilization, memory consumption, or performance monitoring. Each row is a 10-second aggregation bucket tied to a specific run. + +When the user mentions "CPU", "memory", "utilization", "resource usage", or similar terms, query the \`metrics\` table. When they mention "runs", "tasks", "failures", "status", "duration", or "cost", query the \`runs\` table. + ## TSQL Syntax Guide TSQL supports standard SQL syntax with some ClickHouse-specific features: @@ -437,16 +444,45 @@ LIMIT 1000 Only use explicit \`toStartOfHour\`/\`toStartOfDay\` etc. if the user specifically requests a particular bucket size (e.g., "group by hour", "bucket by day"). ### Common Patterns + +#### Runs table - Status filter: WHERE status = 'Failed' or WHERE status IN ('Failed', 'Crashed') -- Time filtering: Use the \`setTimeFilter\` tool (NOT triggered_at in WHERE clause) +- Time filtering: Use the \`setTimeFilter\` tool (NOT triggered_at/bucket_start in WHERE clause) + +#### Metrics table +- Filter by metric name: WHERE metric_name = 'process.cpu.utilization' +- Filter by run: WHERE run_id = 'run_abc123' +- Filter by task: WHERE task_identifier = 'my-task' +- Available metric names: process.cpu.utilization, process.cpu.time, process.memory.usage, system.memory.usage, system.memory.utilization, system.network.io, system.network.dropped, system.network.errors +- Use max_value or last_value for gauges (CPU utilization, memory usage), sum_value for counters (CPU time, network IO) + +\`\`\`sql +-- CPU utilization over time for a task +SELECT timeBucket(), task_identifier, avg(max_value) AS avg_cpu +FROM metrics +WHERE metric_name = 'process.cpu.utilization' +GROUP BY timeBucket, task_identifier +ORDER BY timeBucket +LIMIT 1000 +\`\`\` + +\`\`\`sql +-- Peak memory usage per run +SELECT run_id, task_identifier, max(max_value) AS peak_memory_bytes +FROM metrics +WHERE metric_name = 'process.memory.usage' +GROUP BY run_id, task_identifier +ORDER BY peak_memory_bytes DESC +LIMIT 100 +\`\`\` ## Important Rules 1. NEVER use SELECT * - ClickHouse is a columnar database where SELECT * has very poor performance 2. Always select only the specific columns needed for the request 3. When column selection is ambiguous, use the core columns marked [CORE] in the schema -4. **TIME FILTERING**: When the user wants to filter by time (e.g., "last 7 days", "past hour", "yesterday"), ALWAYS use the \`setTimeFilter\` tool instead of adding \`triggered_at\` conditions to the query. The UI has a time filter that will apply this automatically. -5. Do NOT add \`triggered_at\` to WHERE clauses - use \`setTimeFilter\` tool instead. If the user doesn't specify a time period, do NOT add any time filter (the UI defaults to 7 days). +4. **TIME FILTERING**: When the user wants to filter by time (e.g., "last 7 days", "past hour", "yesterday"), ALWAYS use the \`setTimeFilter\` tool instead of adding time conditions to the WHERE clause. The UI has a time filter that will apply this automatically. This applies to both the \`runs\` table (triggered_at) and the \`metrics\` table (bucket_start). +5. Do NOT add \`triggered_at\` or \`bucket_start\` to WHERE clauses for time filtering - use \`setTimeFilter\` tool instead. If the user doesn't specify a time period, do NOT add any time filter (the UI defaults to 7 days). 6. **TIME BUCKETING**: When the user wants to see data over time or in time buckets, use \`timeBucket()\` in SELECT and reference it as \`timeBucket\` in GROUP BY / ORDER BY. Only use manual bucketing functions (toStartOfHour, toStartOfDay, etc.) when the user explicitly requests a specific bucket size. 7. ALWAYS use the validateTSQLQuery tool to check your query before returning it 8. If validation fails, fix the issues and try again (up to 3 attempts) @@ -472,7 +508,7 @@ If you cannot generate a valid query, explain why briefly.`; * Build the system prompt for edit mode */ private buildEditSystemPrompt(schemaDescription: string): string { - return `You are an expert SQL assistant that modifies existing TSQL queries for a task run analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. + return `You are an expert SQL assistant that modifies existing TSQL queries for a task analytics system. TSQL is a SQL dialect similar to ClickHouse SQL. ## Your Task Modify the provided TSQL query according to the user's instructions. Make only the changes requested - preserve the existing query structure where possible. @@ -480,6 +516,11 @@ Modify the provided TSQL query according to the user's instructions. Make only t ## Available Schema ${schemaDescription} +## Choosing the Right Table + +- **runs** — Task run records (status, timing, cost, output, etc.). Use for questions about runs, tasks, failures, durations, costs, queues. +- **metrics** — Host and runtime metrics collected during task execution (CPU, memory). Use for questions about resource usage, CPU utilization, memory consumption, or performance monitoring. Each row is a 10-second aggregation bucket tied to a specific run. + ## TSQL Syntax Guide TSQL supports standard SQL syntax with some ClickHouse-specific features: @@ -539,11 +580,16 @@ ORDER BY timeBucket LIMIT 1000 \`\`\` +### Common Metrics Patterns +- Filter by metric: WHERE metric_name = 'process.cpu.utilization' +- Available metric names: process.cpu.utilization, process.cpu.time, process.memory.usage, system.memory.usage, system.memory.utilization, system.network.io, system.network.dropped, system.network.errors +- Use max_value or last_value for gauges (CPU utilization, memory usage), sum_value for counters (CPU time, network IO) + ## Important Rules 1. NEVER use SELECT * - ClickHouse is a columnar database where SELECT * has very poor performance 2. If the existing query uses SELECT *, replace it with specific columns (use core columns marked [CORE] as defaults) -3. **TIME FILTERING**: When the user wants to change time filtering (e.g., "change to last 30 days"), use the \`setTimeFilter\` tool instead of modifying \`triggered_at\` conditions. If the existing query has \`triggered_at\` in WHERE, consider removing it and using \`setTimeFilter\` instead. +3. **TIME FILTERING**: When the user wants to change time filtering (e.g., "change to last 30 days"), use the \`setTimeFilter\` tool instead of modifying time column conditions. If the existing query has \`triggered_at\` or \`bucket_start\` in WHERE for time filtering, consider removing it and using \`setTimeFilter\` instead. 4. **TIME BUCKETING**: When adding time-series grouping, use \`timeBucket()\` in SELECT and reference it as \`timeBucket\` in GROUP BY / ORDER BY. Only use manual bucketing functions (toStartOfHour, toStartOfDay, etc.) when the user explicitly requests a specific bucket size. 5. ALWAYS use the validateTSQLQuery tool to check your modified query before returning it 6. If validation fails, fix the issues and try again (up to 3 attempts) From 23893de66472fe0d245b46a64efc42d419456f4d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Feb 2026 17:37:24 +0000 Subject: [PATCH 06/13] Type the known columns in attributes so we can filter and aggregate by them --- .../schema/016_create_metrics_v1.sql | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/internal-packages/clickhouse/schema/016_create_metrics_v1.sql b/internal-packages/clickhouse/schema/016_create_metrics_v1.sql index c0314b7fa0..0210f4bf1e 100644 --- a/internal-packages/clickhouse/schema/016_create_metrics_v1.sql +++ b/internal-packages/clickhouse/schema/016_create_metrics_v1.sql @@ -13,7 +13,23 @@ CREATE TABLE IF NOT EXISTS trigger_dev.metrics_v1 max_value Float64 DEFAULT 0, min_value Float64 DEFAULT 0, last_value Float64 DEFAULT 0, - attributes JSON(max_dynamic_paths=64) + attributes JSON( + `trigger.run_id` String, + `trigger.task_slug` String, + `trigger.attempt_number` Int64, + `trigger.environment_type` LowCardinality(String), + `trigger.machine_id` String, + `trigger.machine_name` LowCardinality(String), + `trigger.worker_id` String, + `trigger.worker_version` String, + `system.cpu.logical_number` String, + `system.cpu.state` LowCardinality(String), + `system.memory.state` LowCardinality(String), + `system.device` String, + `process.cpu.state` LowCardinality(String), + `network.io.direction` LowCardinality(String), + max_dynamic_paths=8 + ) ) ENGINE = MergeTree() PARTITION BY toYYYYMM(bucket_start) From 56b196ebb842e1af2fcb85f449ce6243b959dcdb Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Feb 2026 22:31:08 +0000 Subject: [PATCH 07/13] time bucket thresholds can now be defined per query schema --- apps/webapp/app/v3/querySchemas.ts | 14 +- internal-packages/tsql/src/index.ts | 2 + .../tsql/src/query/printer.test.ts | 77 +++++- internal-packages/tsql/src/query/printer.ts | 8 +- internal-packages/tsql/src/query/schema.ts | 8 + .../tsql/src/query/time_buckets.ts | 22 +- references/hello-world/src/trigger/metrics.ts | 245 ++++++++++++++++++ 7 files changed, 368 insertions(+), 8 deletions(-) create mode 100644 references/hello-world/src/trigger/metrics.ts diff --git a/apps/webapp/app/v3/querySchemas.ts b/apps/webapp/app/v3/querySchemas.ts index 4c41556317..4a8f2fcf6d 100644 --- a/apps/webapp/app/v3/querySchemas.ts +++ b/apps/webapp/app/v3/querySchemas.ts @@ -1,4 +1,4 @@ -import { column, type TableSchema } from "@internal/tsql"; +import { column, type BucketThreshold, type TableSchema } from "@internal/tsql"; import { z } from "zod"; import { autoFormatSQL } from "~/components/code/TSQLEditor"; import { runFriendlyStatus, runStatusTitleFromStatus } from "~/components/runs/v3/TaskRunStatus"; @@ -598,6 +598,18 @@ export const metricsSchema: TableSchema = { expression: "attributes.trigger.worker_version", }, }, + timeBucketThresholds: [ + // Metrics are pre-aggregated into 10-second buckets, so 10s is the most granular interval. + // All thresholds are shifted coarser compared to the runs table defaults. + { maxRangeSeconds: 3 * 60 * 60, interval: { value: 10, unit: "SECOND" } }, + { maxRangeSeconds: 12 * 60 * 60, interval: { value: 1, unit: "MINUTE" } }, + { maxRangeSeconds: 2 * 24 * 60 * 60, interval: { value: 5, unit: "MINUTE" } }, + { maxRangeSeconds: 7 * 24 * 60 * 60, interval: { value: 15, unit: "MINUTE" } }, + { maxRangeSeconds: 30 * 24 * 60 * 60, interval: { value: 1, unit: "HOUR" } }, + { maxRangeSeconds: 90 * 24 * 60 * 60, interval: { value: 6, unit: "HOUR" } }, + { maxRangeSeconds: 180 * 24 * 60 * 60, interval: { value: 1, unit: "DAY" } }, + { maxRangeSeconds: 365 * 24 * 60 * 60, interval: { value: 1, unit: "WEEK" } }, + ] satisfies BucketThreshold[], }; /** diff --git a/internal-packages/tsql/src/index.ts b/internal-packages/tsql/src/index.ts index f4ac1ea6bb..f8a2c7e275 100644 --- a/internal-packages/tsql/src/index.ts +++ b/internal-packages/tsql/src/index.ts @@ -133,7 +133,9 @@ export { // Re-export time bucket utilities export { + BUCKET_THRESHOLDS, calculateTimeBucketInterval, + type BucketThreshold, type TimeBucketInterval, } from "./query/time_buckets.js"; diff --git a/internal-packages/tsql/src/query/printer.test.ts b/internal-packages/tsql/src/query/printer.test.ts index 585f695fa1..33f6238c51 100644 --- a/internal-packages/tsql/src/query/printer.test.ts +++ b/internal-packages/tsql/src/query/printer.test.ts @@ -2,7 +2,13 @@ import { describe, it, expect, beforeEach } from "vitest"; import { parseTSQLSelect, parseTSQLExpr, compileTSQL } from "../index.js"; import { ClickHousePrinter, printToClickHouse, type PrintResult } from "./printer.js"; import { createPrinterContext, PrinterContext } from "./printer_context.js"; -import { createSchemaRegistry, column, type TableSchema, type SchemaRegistry } from "./schema.js"; +import { + createSchemaRegistry, + column, + type TableSchema, + type SchemaRegistry, +} from "./schema.js"; +import type { BucketThreshold } from "./time_buckets.js"; import { QueryError, SyntaxError } from "./errors.js"; /** @@ -3570,4 +3576,73 @@ describe("timeBucket()", () => { expect(Object.values(params)).toContain("org_test123"); }); }); + + describe("per-table timeBucketThresholds", () => { + const customThresholds: BucketThreshold[] = [ + // 10-second minimum granularity (e.g., for pre-aggregated metrics) + { maxRangeSeconds: 10 * 60, interval: { value: 10, unit: "SECOND" } }, + { maxRangeSeconds: 30 * 60, interval: { value: 30, unit: "SECOND" } }, + { maxRangeSeconds: 2 * 60 * 60, interval: { value: 1, unit: "MINUTE" } }, + ]; + + const schemaWithCustomThresholds: TableSchema = { + ...timeBucketSchema, + name: "metrics", + timeBucketThresholds: customThresholds, + }; + + it("should use custom thresholds when defined on the table schema", () => { + // 3-minute range: global default would give 5 SECOND, custom gives 10 SECOND + const threeMinuteRange = { + from: new Date("2024-01-01T00:00:00Z"), + to: new Date("2024-01-01T00:03:00Z"), + }; + + const schema = createSchemaRegistry([schemaWithCustomThresholds]); + const ctx = createPrinterContext({ + schema, + enforcedWhereClause: { + organization_id: { op: "eq", value: "org_test123" }, + project_id: { op: "eq", value: "proj_test456" }, + environment_id: { op: "eq", value: "env_test789" }, + }, + timeRange: threeMinuteRange, + }); + + const ast = parseTSQLSelect( + "SELECT timeBucket(), count() FROM metrics GROUP BY timeBucket" + ); + const { sql } = printToClickHouse(ast, ctx); + + // Custom thresholds: under 10 min → 10 SECOND (not the global 5 SECOND) + expect(sql).toContain("toStartOfInterval(created_at, INTERVAL 10 SECOND)"); + }); + + it("should fall back to global defaults when no custom thresholds are defined", () => { + // 3-minute range with standard schema (no custom thresholds) + const threeMinuteRange = { + from: new Date("2024-01-01T00:00:00Z"), + to: new Date("2024-01-01T00:03:00Z"), + }; + + const schema = createSchemaRegistry([timeBucketSchema]); + const ctx = createPrinterContext({ + schema, + enforcedWhereClause: { + organization_id: { op: "eq", value: "org_test123" }, + project_id: { op: "eq", value: "proj_test456" }, + environment_id: { op: "eq", value: "env_test789" }, + }, + timeRange: threeMinuteRange, + }); + + const ast = parseTSQLSelect( + "SELECT timeBucket(), count() FROM runs GROUP BY timeBucket" + ); + const { sql } = printToClickHouse(ast, ctx); + + // Global default: under 5 min → 5 SECOND + expect(sql).toContain("toStartOfInterval(created_at, INTERVAL 5 SECOND)"); + }); + }); }); diff --git a/internal-packages/tsql/src/query/printer.ts b/internal-packages/tsql/src/query/printer.ts index ff3c608430..8e5d27b56e 100644 --- a/internal-packages/tsql/src/query/printer.ts +++ b/internal-packages/tsql/src/query/printer.ts @@ -2973,8 +2973,12 @@ export class ClickHousePrinter { ); } - // Calculate the appropriate interval - const interval = calculateTimeBucketInterval(timeRange.from, timeRange.to); + // Calculate the appropriate interval (use table-specific thresholds if defined) + const interval = calculateTimeBucketInterval( + timeRange.from, + timeRange.to, + tableSchema.timeBucketThresholds + ); // Emit toStartOfInterval(column, INTERVAL N UNIT) return `toStartOfInterval(${escapeClickHouseIdentifier(clickhouseColumnName)}, INTERVAL ${interval.value} ${interval.unit})`; diff --git a/internal-packages/tsql/src/query/schema.ts b/internal-packages/tsql/src/query/schema.ts index fd8a7add16..7886b48f69 100644 --- a/internal-packages/tsql/src/query/schema.ts +++ b/internal-packages/tsql/src/query/schema.ts @@ -2,6 +2,7 @@ // Defines allowed tables, columns, and tenant isolation configuration import { QueryError } from "./errors"; +import type { BucketThreshold } from "./time_buckets"; /** * ClickHouse data types supported by TSQL @@ -354,6 +355,13 @@ export interface TableSchema { * ``` */ timeConstraint?: string; + /** + * Custom time bucket thresholds for this table. + * When set, timeBucket() uses these instead of the global defaults. + * Useful when the table's time granularity differs from the standard (e.g., metrics + * pre-aggregated into 10-second buckets shouldn't go below 10-second intervals). + */ + timeBucketThresholds?: BucketThreshold[]; } /** diff --git a/internal-packages/tsql/src/query/time_buckets.ts b/internal-packages/tsql/src/query/time_buckets.ts index fe7bd7efe6..04edde007d 100644 --- a/internal-packages/tsql/src/query/time_buckets.ts +++ b/internal-packages/tsql/src/query/time_buckets.ts @@ -17,13 +17,23 @@ export interface TimeBucketInterval { } /** - * Time bucket thresholds: each entry defines a maximum time range duration (in seconds) + * A threshold mapping a maximum time range duration to a bucket interval. + */ +export interface BucketThreshold { + /** Maximum range duration in seconds for this threshold to apply */ + maxRangeSeconds: number; + /** The bucket interval to use when the range is under maxRangeSeconds */ + interval: TimeBucketInterval; +} + +/** + * Default time bucket thresholds: each entry defines a maximum time range duration (in seconds) * and the corresponding bucket interval to use. * * The intervals are chosen to produce roughly 50-100 data points for the given range. * Entries are ordered from smallest to largest range. */ -const BUCKET_THRESHOLDS: Array<{ maxRangeSeconds: number; interval: TimeBucketInterval }> = [ +export const BUCKET_THRESHOLDS: BucketThreshold[] = [ // Under 5 minutes → 5 second buckets (max 60 buckets) { maxRangeSeconds: 5 * 60, interval: { value: 5, unit: "SECOND" } }, // Under 30 minutes → 30 second buckets (max 60 buckets) @@ -73,10 +83,14 @@ const DEFAULT_LARGE_INTERVAL: TimeBucketInterval = { value: 1, unit: "MONTH" }; * ); // { value: 6, unit: "HOUR" } * ``` */ -export function calculateTimeBucketInterval(from: Date, to: Date): TimeBucketInterval { +export function calculateTimeBucketInterval( + from: Date, + to: Date, + thresholds?: BucketThreshold[] +): TimeBucketInterval { const rangeSeconds = Math.abs(to.getTime() - from.getTime()) / 1000; - for (const threshold of BUCKET_THRESHOLDS) { + for (const threshold of thresholds ?? BUCKET_THRESHOLDS) { if (rangeSeconds < threshold.maxRangeSeconds) { return threshold.interval; } diff --git a/references/hello-world/src/trigger/metrics.ts b/references/hello-world/src/trigger/metrics.ts new file mode 100644 index 0000000000..14b0785500 --- /dev/null +++ b/references/hello-world/src/trigger/metrics.ts @@ -0,0 +1,245 @@ +import { batch, logger, task } from "@trigger.dev/sdk"; +import { createHash } from "node:crypto"; +import { setTimeout } from "node:timers/promises"; + +/** + * Tight computational loop that produces sustained high CPU utilization. + * Uses repeated SHA-256 hashing to keep the CPU busy. + */ +export const cpuIntensive = task({ + id: "cpu-intensive", + run: async ( + { + durationSeconds = 60, + }: { + durationSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting CPU-intensive workload", { durationSeconds }); + + const deadline = Date.now() + durationSeconds * 1000; + let iterations = 0; + let data = Buffer.from("seed-data-for-hashing"); + + while (Date.now() < deadline) { + // Tight hashing loop — ~100ms chunks then yield to event loop + const chunkEnd = Date.now() + 100; + while (Date.now() < chunkEnd) { + data = createHash("sha256").update(data).digest(); + iterations++; + } + // Yield to let metrics collection and heartbeats run + await setTimeout(1); + } + + logger.info("CPU-intensive workload complete", { iterations }); + return { iterations }; + }, +}); + +/** + * Progressively allocates memory in steps, holds it, then releases. + * Produces a staircase-shaped memory usage graph. + */ +export const memoryRamp = task({ + id: "memory-ramp", + run: async ( + { + steps = 6, + stepSizeMb = 50, + stepIntervalSeconds = 5, + holdSeconds = 15, + }: { + steps?: number; + stepSizeMb?: number; + stepIntervalSeconds?: number; + holdSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting memory ramp", { steps, stepSizeMb, stepIntervalSeconds, holdSeconds }); + + const allocations: Buffer[] = []; + + // Ramp up — allocate in steps + for (let i = 0; i < steps; i++) { + const buf = Buffer.alloc(stepSizeMb * 1024 * 1024, 0xff); + allocations.push(buf); + logger.info(`Allocated step ${i + 1}/${steps}`, { + totalAllocatedMb: (i + 1) * stepSizeMb, + }); + await setTimeout(stepIntervalSeconds * 1000); + } + + // Hold at peak + logger.info("Holding at peak memory", { totalMb: steps * stepSizeMb }); + await setTimeout(holdSeconds * 1000); + + // Release + allocations.length = 0; + global.gc?.(); + logger.info("Released all allocations"); + + // Let metrics capture the drop + await setTimeout(10_000); + + logger.info("Memory ramp complete"); + return { peakMb: steps * stepSizeMb }; + }, +}); + +/** + * Alternates between CPU-intensive bursts and idle sleep periods. + * Produces a sawtooth/square-wave CPU utilization pattern. + */ +export const burstyWorkload = task({ + id: "bursty-workload", + run: async ( + { + cycles = 5, + burstSeconds = 5, + idleSeconds = 5, + }: { + cycles?: number; + burstSeconds?: number; + idleSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting bursty workload", { cycles, burstSeconds, idleSeconds }); + + for (let cycle = 0; cycle < cycles; cycle++) { + // Burst phase — hash as fast as possible + logger.info(`Cycle ${cycle + 1}/${cycles}: burst phase`); + const burstDeadline = Date.now() + burstSeconds * 1000; + let data = Buffer.from(`burst-cycle-${cycle}`); + while (Date.now() < burstDeadline) { + const chunkEnd = Date.now() + 100; + while (Date.now() < chunkEnd) { + data = createHash("sha256").update(data).digest(); + } + await setTimeout(1); + } + + // Idle phase + logger.info(`Cycle ${cycle + 1}/${cycles}: idle phase`); + await setTimeout(idleSeconds * 1000); + } + + logger.info("Bursty workload complete", { totalCycles: cycles }); + return { cycles }; + }, +}); + +/** + * Simulates a data processing pipeline with distinct phases: + * 1. Read phase — light CPU, growing memory (buffering data) + * 2. Process phase — high CPU, stable memory (crunching data) + * 3. Write phase — low CPU, memory drops (streaming out results) + * + * Shows clear phase transitions in both CPU and memory graphs. + */ +export const sustainedWorkload = task({ + id: "sustained-workload", + run: async ( + { + readSeconds = 20, + processSeconds = 20, + writeSeconds = 20, + dataSizeMb = 100, + }: { + readSeconds?: number; + processSeconds?: number; + writeSeconds?: number; + dataSizeMb?: number; + }, + { ctx } + ) => { + logger.info("Starting sustained workload — read phase", { readSeconds, dataSizeMb }); + + // Phase 1: Read — gradually accumulate buffers (memory ramp, low CPU) + const chunks: Buffer[] = []; + const chunkCount = 10; + const chunkSize = Math.floor((dataSizeMb * 1024 * 1024) / chunkCount); + const readInterval = (readSeconds * 1000) / chunkCount; + + for (let i = 0; i < chunkCount; i++) { + chunks.push(Buffer.alloc(chunkSize, i)); + logger.info(`Read ${i + 1}/${chunkCount} chunks`); + await setTimeout(readInterval); + } + + // Phase 2: Process — hash all chunks repeatedly (high CPU, stable memory) + logger.info("Entering process phase", { processSeconds }); + const processDeadline = Date.now() + processSeconds * 1000; + let hashCount = 0; + + while (Date.now() < processDeadline) { + const chunkEnd = Date.now() + 100; + while (Date.now() < chunkEnd) { + for (const chunk of chunks) { + createHash("sha256").update(chunk).digest(); + hashCount++; + } + } + await setTimeout(1); + } + + // Phase 3: Write — release memory gradually (low CPU, memory drops) + logger.info("Entering write phase", { writeSeconds }); + const writeInterval = (writeSeconds * 1000) / chunkCount; + + for (let i = chunkCount - 1; i >= 0; i--) { + chunks.pop(); + logger.info(`Wrote and released chunk ${chunkCount - i}/${chunkCount}`); + await setTimeout(writeInterval); + } + + global.gc?.(); + await setTimeout(5000); + + logger.info("Sustained workload complete", { hashCount }); + return { hashCount }; + }, +}); + +/** + * Parent task that fans out multiple child tasks in parallel. + * Useful for seeing per-run breakdowns in metrics queries grouped by run_id. + */ +export const concurrentLoad = task({ + id: "concurrent-load", + run: async ( + { + concurrency = 3, + taskType = "bursty-workload" as "cpu-intensive" | "bursty-workload", + durationSeconds = 30, + }: { + concurrency?: number; + taskType?: "cpu-intensive" | "bursty-workload"; + durationSeconds?: number; + }, + { ctx } + ) => { + logger.info("Starting concurrent load", { concurrency, taskType, durationSeconds }); + + const items = Array.from({ length: concurrency }, (_, i) => { + if (taskType === "cpu-intensive") { + return { id: cpuIntensive.id, payload: { durationSeconds } }; + } + return { + id: burstyWorkload.id, + payload: { cycles: 3, burstSeconds: 5, idleSeconds: 5 }, + }; + }); + + const results = await batch.triggerAndWait(items); + + logger.info("All children completed", { + count: results.runs.length, + }); + + return { childRunIds: results.runs.map((r) => r.id) }; + }, +}); From bf178b277f328d5c2357af8122a9117d262c3731 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Feb 2026 08:57:51 +0000 Subject: [PATCH 08/13] Add support for prettyFormat --- .../app/components/code/QueryResultsChart.tsx | 77 ++++++++++- .../app/components/code/TSQLResultsTable.tsx | 73 +++++++++- .../primitives/charts/BigNumberCard.tsx | 27 +++- .../components/primitives/charts/Chart.tsx | 9 +- .../components/primitives/charts/ChartBar.tsx | 5 +- .../primitives/charts/ChartLegendCompound.tsx | 29 +++- .../primitives/charts/ChartLine.tsx | 19 ++- .../primitives/charts/ChartRoot.tsx | 7 + apps/webapp/app/utils/columnFormat.ts | 70 ++++++++++ .../app/v3/services/aiQueryService.server.ts | 14 +- internal-packages/clickhouse/src/index.ts | 2 +- internal-packages/tsql/src/index.ts | 1 + .../tsql/src/query/printer.test.ts | 130 ++++++++++++++++++ internal-packages/tsql/src/query/printer.ts | 64 +++++++++ internal-packages/tsql/src/query/schema.ts | 37 +++++ 15 files changed, 542 insertions(+), 22 deletions(-) create mode 100644 apps/webapp/app/utils/columnFormat.ts diff --git a/apps/webapp/app/components/code/QueryResultsChart.tsx b/apps/webapp/app/components/code/QueryResultsChart.tsx index 4f7899a740..82cd7234da 100644 --- a/apps/webapp/app/components/code/QueryResultsChart.tsx +++ b/apps/webapp/app/components/code/QueryResultsChart.tsx @@ -1,5 +1,8 @@ -import type { OutputColumnMetadata } from "@internal/clickhouse"; +import type { ColumnFormatType, OutputColumnMetadata } from "@internal/clickhouse"; +import { formatDurationMilliseconds } from "@trigger.dev/core/v3"; import { memo, useMemo } from "react"; +import { createValueFormatter } from "~/utils/columnFormat"; +import { formatCurrencyAccurate } from "~/utils/numberFormatter"; import type { ChartConfig } from "~/components/primitives/charts/Chart"; import { Chart } from "~/components/primitives/charts/ChartCompound"; import { Paragraph } from "../primitives/Paragraph"; @@ -797,8 +800,24 @@ export const QueryResultsChart = memo(function QueryResultsChart({ }; }, [isDateBased, timeGranularity]); - // Create dynamic Y-axis formatter based on data range - const yAxisFormatter = useMemo(() => createYAxisFormatter(data, series), [data, series]); + // Resolve the Y-axis column format for formatting + const yAxisFormat = useMemo(() => { + if (yAxisColumns.length === 0) return undefined; + const col = columns.find((c) => c.name === yAxisColumns[0]); + return (col?.format ?? col?.customRenderType) as ColumnFormatType | undefined; + }, [yAxisColumns, columns]); + + // Create dynamic Y-axis formatter based on data range and format + const yAxisFormatter = useMemo( + () => createYAxisFormatter(data, series, yAxisFormat), + [data, series, yAxisFormat] + ); + + // Create value formatter for tooltips and legend based on column format + const tooltipValueFormatter = useMemo( + () => createValueFormatter(yAxisFormat), + [yAxisFormat] + ); // Check if the group-by column has a runStatus customRenderType const groupByIsRunStatus = useMemo(() => { @@ -1016,6 +1035,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ showLegend={showLegend} maxLegendItems={fullLegend ? Infinity : 5} legendAggregation={config.aggregation} + legendValueFormatter={tooltipValueFormatter} minHeight="300px" fillContainer onViewAllLegendItems={onViewAllLegendItems} @@ -1027,6 +1047,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ yAxisProps={yAxisProps} stackId={stacked ? "stack" : undefined} tooltipLabelFormatter={tooltipLabelFormatter} + tooltipValueFormatter={tooltipValueFormatter} /> ); @@ -1043,6 +1064,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ showLegend={showLegend} maxLegendItems={fullLegend ? Infinity : 5} legendAggregation={config.aggregation} + legendValueFormatter={tooltipValueFormatter} minHeight="300px" fillContainer onViewAllLegendItems={onViewAllLegendItems} @@ -1054,6 +1076,7 @@ export const QueryResultsChart = memo(function QueryResultsChart({ yAxisProps={yAxisProps} stacked={stacked && sortedSeries.length > 1} tooltipLabelFormatter={tooltipLabelFormatter} + tooltipValueFormatter={tooltipValueFormatter} lineType="linear" /> @@ -1061,9 +1084,13 @@ export const QueryResultsChart = memo(function QueryResultsChart({ }); /** - * Creates a Y-axis value formatter based on the data range + * Creates a Y-axis value formatter based on the data range and optional format hint */ -function createYAxisFormatter(data: Record[], series: string[]) { +function createYAxisFormatter( + data: Record[], + series: string[], + format?: ColumnFormatType +) { // Find min and max values across all series let minVal = Infinity; let maxVal = -Infinity; @@ -1080,6 +1107,46 @@ function createYAxisFormatter(data: Record[], series: string[]) const range = maxVal - minVal; + // Format-aware formatters + if (format === "bytes" || format === "decimalBytes") { + const divisor = format === "bytes" ? 1024 : 1000; + const units = + format === "bytes" + ? ["B", "KiB", "MiB", "GiB", "TiB"] + : ["B", "KB", "MB", "GB", "TB"]; + return (value: number): string => { + if (value === 0) return "0 B"; + // Use consistent unit for all ticks based on max value + const i = Math.min( + Math.floor(Math.log(Math.abs(maxVal || 1)) / Math.log(divisor)), + units.length - 1 + ); + const scaled = value / Math.pow(divisor, i); + return `${scaled.toFixed(scaled < 10 ? 1 : 0)} ${units[i]}`; + }; + } + + if (format === "percent") { + return (value: number): string => `${value.toFixed(range < 1 ? 2 : 1)}%`; + } + + if (format === "duration") { + return (value: number): string => formatDurationMilliseconds(value, { style: "short" }); + } + + if (format === "durationSeconds") { + return (value: number): string => + formatDurationMilliseconds(value * 1000, { style: "short" }); + } + + if (format === "costInDollars" || format === "cost") { + return (value: number): string => { + const dollars = format === "cost" ? value / 100 : value; + return formatCurrencyAccurate(dollars); + }; + } + + // Default formatter return (value: number): string => { // Use abbreviations for large numbers if (Math.abs(value) >= 1_000_000) { diff --git a/apps/webapp/app/components/code/TSQLResultsTable.tsx b/apps/webapp/app/components/code/TSQLResultsTable.tsx index 951e848235..5c6af16559 100644 --- a/apps/webapp/app/components/code/TSQLResultsTable.tsx +++ b/apps/webapp/app/components/code/TSQLResultsTable.tsx @@ -35,6 +35,7 @@ import { useCopy } from "~/hooks/useCopy"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; import { cn } from "~/utils/cn"; +import { formatBytes, formatDecimalBytes, formatQuantity } from "~/utils/columnFormat"; import { formatCurrencyAccurate, formatNumber } from "~/utils/numberFormatter"; import { v3ProjectPath, v3RunPathFromFriendlyId } from "~/utils/pathBuilder"; import { Paragraph } from "../primitives/Paragraph"; @@ -64,9 +65,10 @@ function getFormattedValue(value: unknown, column: OutputColumnMetadata): string if (value === null) return "NULL"; if (value === undefined) return ""; - // Handle custom render types - if (column.customRenderType) { - switch (column.customRenderType) { + // Handle format hints (from prettyFormat() or auto-populated from customRenderType) + const formatType = column.format ?? column.customRenderType; + if (formatType) { + switch (formatType) { case "duration": if (typeof value === "number") { return formatDurationMilliseconds(value, { style: "short" }); @@ -93,6 +95,26 @@ function getFormattedValue(value: unknown, column: OutputColumnMetadata): string return value; } break; + case "bytes": + if (typeof value === "number") { + return formatBytes(value); + } + break; + case "decimalBytes": + if (typeof value === "number") { + return formatDecimalBytes(value); + } + break; + case "percent": + if (typeof value === "number") { + return `${value.toFixed(2)}%`; + } + break; + case "quantity": + if (typeof value === "number") { + return formatQuantity(value); + } + break; } } @@ -220,6 +242,21 @@ function getDisplayLength(value: unknown, column: OutputColumnMetadata): number if (value === null) return 4; // "NULL" if (value === undefined) return 9; // "UNDEFINED" + // Handle format hint types - estimate their rendered width + const fmt = column.format; + if (fmt === "bytes" || fmt === "decimalBytes") { + // e.g., "1.50 GiB" or "256.00 MB" + return 12; + } + if (fmt === "percent") { + // e.g., "45.23%" + return 8; + } + if (fmt === "quantity") { + // e.g., "1.50M" + return 8; + } + // Handle custom render types - estimate their rendered width if (column.customRenderType) { switch (column.customRenderType) { @@ -392,6 +429,10 @@ function isRightAlignedColumn(column: OutputColumnMetadata): boolean { ) { return true; } + const fmt = column.format; + if (fmt === "bytes" || fmt === "decimalBytes" || fmt === "percent" || fmt === "quantity") { + return true; + } return isNumericType(column.type); } @@ -474,6 +515,32 @@ function CellValue({ return
UNDEFINED
; } + // Check format hint for new format types (from prettyFormat()) + if (column.format && !column.customRenderType) { + switch (column.format) { + case "bytes": + if (typeof value === "number") { + return {formatBytes(value)}; + } + break; + case "decimalBytes": + if (typeof value === "number") { + return {formatDecimalBytes(value)}; + } + break; + case "percent": + if (typeof value === "number") { + return {value.toFixed(2)}%; + } + break; + case "quantity": + if (typeof value === "number") { + return {formatQuantity(value)}; + } + break; + } + } + // First check customRenderType for special rendering if (column.customRenderType) { switch (column.customRenderType) { diff --git a/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx b/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx index f9d4280412..65bda5a9ca 100644 --- a/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx +++ b/apps/webapp/app/components/primitives/charts/BigNumberCard.tsx @@ -1,9 +1,10 @@ -import type { OutputColumnMetadata } from "@internal/tsql"; +import type { ColumnFormatType, OutputColumnMetadata } from "@internal/tsql"; import { useMemo } from "react"; import type { BigNumberAggregationType, BigNumberConfiguration, } from "~/components/metrics/QueryWidget"; +import { createValueFormatter } from "~/utils/columnFormat"; import { AnimatedNumber } from "../AnimatedNumber"; import { Spinner } from "../Spinner"; import { Paragraph } from "../Paragraph"; @@ -129,6 +130,15 @@ export function BigNumberCard({ rows, columns, config, isLoading = false }: BigN return aggregateValues(values, aggregation); }, [rows, column, aggregation, sortDirection]); + // Look up column format for format-aware display + const columnValueFormatter = useMemo(() => { + const columnMeta = columns.find((c) => c.name === column); + const formatType = (columnMeta?.format ?? columnMeta?.customRenderType) as + | ColumnFormatType + | undefined; + return createValueFormatter(formatType); + }, [columns, column]); + if (isLoading) { return (
@@ -147,6 +157,21 @@ export function BigNumberCard({ rows, columns, config, isLoading = false }: BigN ); } + // Use format-aware formatter when available + if (columnValueFormatter) { + return ( +
+
+
+ {prefix && {prefix}} + {columnValueFormatter(result)} + {suffix && {suffix}} +
+
+
+ ); + } + const { displayValue, unitSuffix, decimalPlaces } = abbreviate ? abbreviateValue(result) : { displayValue: result, unitSuffix: undefined, decimalPlaces: getDecimalPlaces(result) }; diff --git a/apps/webapp/app/components/primitives/charts/Chart.tsx b/apps/webapp/app/components/primitives/charts/Chart.tsx index 0d7cd741a6..ffed1a2bc1 100644 --- a/apps/webapp/app/components/primitives/charts/Chart.tsx +++ b/apps/webapp/app/components/primitives/charts/Chart.tsx @@ -104,6 +104,8 @@ const ChartTooltipContent = React.forwardRef< indicator?: "line" | "dot" | "dashed"; nameKey?: string; labelKey?: string; + /** Optional formatter for numeric values (e.g. bytes, duration) */ + valueFormatter?: (value: number) => string; } >( ( @@ -121,6 +123,7 @@ const ChartTooltipContent = React.forwardRef< color, nameKey, labelKey, + valueFormatter, }, ref ) => { @@ -221,9 +224,11 @@ const ChartTooltipContent = React.forwardRef< {itemConfig?.label || item.name}
- {item.value && ( + {item.value != null && ( - {item.value.toLocaleString()} + {valueFormatter && typeof item.value === "number" + ? valueFormatter(item.value) + : item.value.toLocaleString()} )} diff --git a/apps/webapp/app/components/primitives/charts/ChartBar.tsx b/apps/webapp/app/components/primitives/charts/ChartBar.tsx index a34ce66759..814cc5777a 100644 --- a/apps/webapp/app/components/primitives/charts/ChartBar.tsx +++ b/apps/webapp/app/components/primitives/charts/ChartBar.tsx @@ -48,6 +48,8 @@ export type ChartBarRendererProps = { referenceLine?: ReferenceLineProps; /** Custom tooltip label formatter */ tooltipLabelFormatter?: (label: string, payload: any[]) => string; + /** Optional formatter for numeric tooltip values (e.g. bytes, duration) */ + tooltipValueFormatter?: (value: number) => string; /** Width injected by ResponsiveContainer */ width?: number; /** Height injected by ResponsiveContainer */ @@ -72,6 +74,7 @@ export function ChartBarRenderer({ yAxisProps: yAxisPropsProp, referenceLine, tooltipLabelFormatter, + tooltipValueFormatter, width, height, }: ChartBarRendererProps) { @@ -170,7 +173,7 @@ export function ChartBarRenderer({ showLegend ? ( () => null ) : tooltipLabelFormatter ? ( - + ) : ( string; /** Callback when "View all" button is clicked */ onViewAllLegendItems?: () => void; /** When true, constrains legend to max 50% height with scrolling */ @@ -50,6 +52,7 @@ export function ChartLegendCompound({ className, totalLabel, aggregation, + valueFormatter, onViewAllLegendItems, scrollable = false, }: ChartLegendCompoundProps) { @@ -179,7 +182,11 @@ export function ChartLegendCompound({ {currentTotalLabel} {currentTotal != null ? ( - + valueFormatter ? ( + valueFormatter(currentTotal) + ) : ( + + ) ) : ( "\u2013" )} @@ -251,7 +258,11 @@ export function ChartLegendCompound({ )} > {total != null ? ( - + valueFormatter ? ( + valueFormatter(total) + ) : ( + + ) ) : ( "\u2013" )} @@ -268,6 +279,7 @@ export function ChartLegendCompound({ item={legendItems.hoveredHiddenItem} value={currentData[legendItems.hoveredHiddenItem.dataKey] ?? null} remainingCount={legendItems.remaining - 1} + valueFormatter={valueFormatter} /> ) : ( string; }; -function HoveredHiddenItemRow({ item, value, remainingCount }: HoveredHiddenItemRowProps) { +function HoveredHiddenItemRow({ item, value, remainingCount, valueFormatter }: HoveredHiddenItemRowProps) { return (
{/* Active highlight background */} @@ -338,7 +351,15 @@ function HoveredHiddenItemRow({ item, value, remainingCount }: HoveredHiddenItem {remainingCount > 0 && +{remainingCount} more}
- {value != null ? : "\u2013"} + {value != null ? ( + valueFormatter ? ( + valueFormatter(value) + ) : ( + + ) + ) : ( + "\u2013" + )} diff --git a/apps/webapp/app/components/primitives/charts/ChartLine.tsx b/apps/webapp/app/components/primitives/charts/ChartLine.tsx index 7bfd9090ca..62f1bd5abc 100644 --- a/apps/webapp/app/components/primitives/charts/ChartLine.tsx +++ b/apps/webapp/app/components/primitives/charts/ChartLine.tsx @@ -51,6 +51,8 @@ export type ChartLineRendererProps = { stacked?: boolean; /** Custom tooltip label formatter */ tooltipLabelFormatter?: (label: string, payload: any[]) => string; + /** Optional formatter for numeric tooltip values (e.g. bytes, duration) */ + tooltipValueFormatter?: (value: number) => string; /** Width injected by ResponsiveContainer */ width?: number; /** Height injected by ResponsiveContainer */ @@ -75,6 +77,7 @@ export function ChartLineRenderer({ yAxisProps: yAxisPropsProp, stacked = false, tooltipLabelFormatter, + tooltipValueFormatter, width, height, }: ChartLineRendererProps) { @@ -158,7 +161,13 @@ export function ChartLineRenderer({ {/* When legend is shown below, render tooltip with cursor only (no content popup) */} null : } + content={ + showLegend ? ( + () => null + ) : ( + + ) + } labelFormatter={tooltipLabelFormatter} /> {/* Note: Legend is now rendered by ChartRoot outside the chart container */} @@ -207,7 +216,13 @@ export function ChartLineRenderer({ {/* When legend is shown below, render tooltip with cursor only (no content popup) */} null : } + content={ + showLegend ? ( + () => null + ) : ( + + ) + } labelFormatter={tooltipLabelFormatter} /> {/* Note: Legend is now rendered by ChartRoot outside the chart container */} diff --git a/apps/webapp/app/components/primitives/charts/ChartRoot.tsx b/apps/webapp/app/components/primitives/charts/ChartRoot.tsx index 9b5eb3ccb8..dd508e6b16 100644 --- a/apps/webapp/app/components/primitives/charts/ChartRoot.tsx +++ b/apps/webapp/app/components/primitives/charts/ChartRoot.tsx @@ -32,6 +32,8 @@ export type ChartRootProps = { legendTotalLabel?: string; /** Aggregation method used by the legend to compute totals (defaults to sum behavior) */ legendAggregation?: AggregationType; + /** Optional formatter for numeric legend values (e.g. bytes, duration) */ + legendValueFormatter?: (value: number) => string; /** Callback when "View all" legend button is clicked */ onViewAllLegendItems?: () => void; /** When true, constrains legend to max 50% height with scrolling */ @@ -77,6 +79,7 @@ export function ChartRoot({ maxLegendItems = 5, legendTotalLabel, legendAggregation, + legendValueFormatter, onViewAllLegendItems, legendScrollable = false, fillContainer = false, @@ -101,6 +104,7 @@ export function ChartRoot({ maxLegendItems={maxLegendItems} legendTotalLabel={legendTotalLabel} legendAggregation={legendAggregation} + legendValueFormatter={legendValueFormatter} onViewAllLegendItems={onViewAllLegendItems} legendScrollable={legendScrollable} fillContainer={fillContainer} @@ -118,6 +122,7 @@ type ChartRootInnerProps = { maxLegendItems?: number; legendTotalLabel?: string; legendAggregation?: AggregationType; + legendValueFormatter?: (value: number) => string; onViewAllLegendItems?: () => void; legendScrollable?: boolean; fillContainer?: boolean; @@ -131,6 +136,7 @@ function ChartRootInner({ maxLegendItems = 5, legendTotalLabel, legendAggregation, + legendValueFormatter, onViewAllLegendItems, legendScrollable = false, fillContainer = false, @@ -173,6 +179,7 @@ function ChartRootInner({ maxItems={maxLegendItems} totalLabel={legendTotalLabel} aggregation={legendAggregation} + valueFormatter={legendValueFormatter} onViewAllLegendItems={onViewAllLegendItems} scrollable={legendScrollable} /> diff --git a/apps/webapp/app/utils/columnFormat.ts b/apps/webapp/app/utils/columnFormat.ts new file mode 100644 index 0000000000..ae629c703a --- /dev/null +++ b/apps/webapp/app/utils/columnFormat.ts @@ -0,0 +1,70 @@ +import type { ColumnFormatType } from "@internal/clickhouse"; +import { formatDurationMilliseconds } from "@trigger.dev/core/v3"; +import { formatCurrencyAccurate } from "~/utils/numberFormatter"; + +/** + * Format a number as binary bytes (KiB, MiB, GiB, TiB) + */ +export function formatBytes(bytes: number): string { + if (bytes === 0) return "0 B"; + const units = ["B", "KiB", "MiB", "GiB", "TiB"]; + const i = Math.min( + Math.floor(Math.log(Math.abs(bytes)) / Math.log(1024)), + units.length - 1 + ); + return `${(bytes / Math.pow(1024, i)).toFixed(i === 0 ? 0 : 2)} ${units[i]}`; +} + +/** + * Format a number as decimal bytes (KB, MB, GB, TB) + */ +export function formatDecimalBytes(bytes: number): string { + if (bytes === 0) return "0 B"; + const units = ["B", "KB", "MB", "GB", "TB"]; + const i = Math.min( + Math.floor(Math.log(Math.abs(bytes)) / Math.log(1000)), + units.length - 1 + ); + return `${(bytes / Math.pow(1000, i)).toFixed(i === 0 ? 0 : 2)} ${units[i]}`; +} + +/** + * Format a large number with human-readable suffix (K, M, B) + */ +export function formatQuantity(value: number): string { + const abs = Math.abs(value); + if (abs >= 1_000_000_000) return `${(value / 1_000_000_000).toFixed(2)}B`; + if (abs >= 1_000_000) return `${(value / 1_000_000).toFixed(2)}M`; + if (abs >= 1_000) return `${(value / 1_000).toFixed(2)}K`; + return value.toLocaleString(); +} + +/** + * Creates a value formatter function for a given column format type. + * Used by chart tooltips, legend values, and big number cards. + */ +export function createValueFormatter( + format?: ColumnFormatType +): ((value: number) => string) | undefined { + if (!format) return undefined; + switch (format) { + case "bytes": + return (v) => formatBytes(v); + case "decimalBytes": + return (v) => formatDecimalBytes(v); + case "percent": + return (v) => `${v.toFixed(2)}%`; + case "quantity": + return (v) => formatQuantity(v); + case "duration": + return (v) => formatDurationMilliseconds(v, { style: "short" }); + case "durationSeconds": + return (v) => formatDurationMilliseconds(v * 1000, { style: "short" }); + case "costInDollars": + return (v) => formatCurrencyAccurate(v); + case "cost": + return (v) => formatCurrencyAccurate(v / 100); + default: + return undefined; + } +} diff --git a/apps/webapp/app/v3/services/aiQueryService.server.ts b/apps/webapp/app/v3/services/aiQueryService.server.ts index 5d75009baa..2f0174fbe6 100644 --- a/apps/webapp/app/v3/services/aiQueryService.server.ts +++ b/apps/webapp/app/v3/services/aiQueryService.server.ts @@ -455,10 +455,16 @@ Only use explicit \`toStartOfHour\`/\`toStartOfDay\` etc. if the user specifical - Filter by task: WHERE task_identifier = 'my-task' - Available metric names: process.cpu.utilization, process.cpu.time, process.memory.usage, system.memory.usage, system.memory.utilization, system.network.io, system.network.dropped, system.network.errors - Use max_value or last_value for gauges (CPU utilization, memory usage), sum_value for counters (CPU time, network IO) +- Use prettyFormat(expr, 'bytes') to tell the UI to format values as bytes (e.g., "1.50 GiB") — keeps values numeric for charts +- Use prettyFormat(expr, 'percent') for percentage values +- prettyFormat does NOT change the SQL — it only adds a display hint +- Available format types: bytes, decimalBytes, percent, quantity, duration, durationSeconds, costInDollars +- For memory metrics, always use prettyFormat with 'bytes' +- For CPU utilization, consider prettyFormat with 'percent' \`\`\`sql -- CPU utilization over time for a task -SELECT timeBucket(), task_identifier, avg(max_value) AS avg_cpu +SELECT timeBucket(), task_identifier, prettyFormat(avg(max_value), 'percent') AS avg_cpu FROM metrics WHERE metric_name = 'process.cpu.utilization' GROUP BY timeBucket, task_identifier @@ -468,11 +474,11 @@ LIMIT 1000 \`\`\`sql -- Peak memory usage per run -SELECT run_id, task_identifier, max(max_value) AS peak_memory_bytes +SELECT run_id, task_identifier, prettyFormat(max(max_value), 'bytes') AS peak_memory FROM metrics WHERE metric_name = 'process.memory.usage' GROUP BY run_id, task_identifier -ORDER BY peak_memory_bytes DESC +ORDER BY peak_memory DESC LIMIT 100 \`\`\` @@ -584,6 +590,8 @@ LIMIT 1000 - Filter by metric: WHERE metric_name = 'process.cpu.utilization' - Available metric names: process.cpu.utilization, process.cpu.time, process.memory.usage, system.memory.usage, system.memory.utilization, system.network.io, system.network.dropped, system.network.errors - Use max_value or last_value for gauges (CPU utilization, memory usage), sum_value for counters (CPU time, network IO) +- Use prettyFormat(expr, 'bytes') for memory metrics, prettyFormat(expr, 'percent') for CPU utilization +- prettyFormat does NOT change the SQL — it only adds a display hint for the UI ## Important Rules diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 1f1b6e2ff0..aa41b207ff 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -58,7 +58,7 @@ export { type FieldMappings, type WhereClauseCondition, } from "./client/tsql.js"; -export type { OutputColumnMetadata } from "@internal/tsql"; +export type { ColumnFormatType, OutputColumnMetadata } from "@internal/tsql"; // Errors export { QueryError } from "./client/errors.js"; diff --git a/internal-packages/tsql/src/index.ts b/internal-packages/tsql/src/index.ts index f8a2c7e275..923d8b8829 100644 --- a/internal-packages/tsql/src/index.ts +++ b/internal-packages/tsql/src/index.ts @@ -109,6 +109,7 @@ export { type ClickHouseType, type ColumnSchema, type FieldMappings, + type ColumnFormatType, type OutputColumnMetadata, type RequiredFilter, type SchemaRegistry, diff --git a/internal-packages/tsql/src/query/printer.test.ts b/internal-packages/tsql/src/query/printer.test.ts index 33f6238c51..7349a7bd27 100644 --- a/internal-packages/tsql/src/query/printer.test.ts +++ b/internal-packages/tsql/src/query/printer.test.ts @@ -2341,16 +2341,19 @@ describe("Basic column metadata", () => { name: "status", type: "LowCardinality(String)", customRenderType: "runStatus", + format: "runStatus", }); expect(columns[1]).toEqual({ name: "usage_duration_ms", type: "UInt32", customRenderType: "duration", + format: "duration", }); expect(columns[2]).toEqual({ name: "cost_in_cents", type: "Float64", customRenderType: "cost", + format: "cost", }); }); @@ -2693,6 +2696,133 @@ describe("Basic column metadata", () => { expect(columns[2].name).toBe("avg"); }); }); + + describe("prettyFormat()", () => { + it("should strip prettyFormat from SQL and attach format to column metadata", () => { + const ctx = createMetadataTestContext(); + const { sql, columns } = printQuery( + "SELECT prettyFormat(usage_duration_ms, 'bytes') AS memory FROM runs", + ctx + ); + + // SQL should not contain prettyFormat + expect(sql).not.toContain("prettyFormat"); + expect(sql).toContain("usage_duration_ms"); + + expect(columns).toHaveLength(1); + expect(columns[0].name).toBe("memory"); + expect(columns[0].format).toBe("bytes"); + }); + + it("should work with aggregation wrapping", () => { + const ctx = createMetadataTestContext(); + const { sql, columns } = printQuery( + "SELECT prettyFormat(avg(usage_duration_ms), 'bytes') AS avg_memory FROM runs", + ctx + ); + + expect(sql).not.toContain("prettyFormat"); + expect(sql).toContain("avg(usage_duration_ms)"); + + expect(columns).toHaveLength(1); + expect(columns[0].name).toBe("avg_memory"); + expect(columns[0].format).toBe("bytes"); + expect(columns[0].type).toBe("Float64"); + }); + + it("should work without explicit alias", () => { + const ctx = createMetadataTestContext(); + const { sql, columns } = printQuery( + "SELECT prettyFormat(usage_duration_ms, 'percent') FROM runs", + ctx + ); + + expect(sql).not.toContain("prettyFormat"); + expect(columns).toHaveLength(1); + expect(columns[0].name).toBe("usage_duration_ms"); + expect(columns[0].format).toBe("percent"); + }); + + it("should throw for invalid format type", () => { + const ctx = createMetadataTestContext(); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 'invalid') FROM runs", + ctx + ); + }).toThrow(QueryError); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 'invalid') FROM runs", + ctx + ); + }).toThrow(/Unknown format type/); + }); + + it("should throw for wrong argument count", () => { + const ctx = createMetadataTestContext(); + expect(() => { + printQuery("SELECT prettyFormat(usage_duration_ms) FROM runs", ctx); + }).toThrow(QueryError); + expect(() => { + printQuery("SELECT prettyFormat(usage_duration_ms) FROM runs", ctx); + }).toThrow(/requires exactly 2 arguments/); + }); + + it("should throw when second argument is not a string literal", () => { + const ctx = createMetadataTestContext(); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 123) FROM runs", + ctx + ); + }).toThrow(QueryError); + expect(() => { + printQuery( + "SELECT prettyFormat(usage_duration_ms, 123) FROM runs", + ctx + ); + }).toThrow(/must be a string literal/); + }); + + it("should override schema-level customRenderType", () => { + const ctx = createMetadataTestContext(); + const { columns } = printQuery( + "SELECT prettyFormat(usage_duration_ms, 'bytes') AS mem FROM runs", + ctx + ); + + expect(columns).toHaveLength(1); + // prettyFormat's format should take precedence + expect(columns[0].format).toBe("bytes"); + // customRenderType from schema should NOT be set since prettyFormat overrides + // The source column had customRenderType: "duration" but prettyFormat replaces it + }); + + it("should auto-populate format from customRenderType when not explicitly set", () => { + const ctx = createMetadataTestContext(); + const { columns } = printQuery( + "SELECT usage_duration_ms, cost_in_cents FROM runs", + ctx + ); + + expect(columns).toHaveLength(2); + // customRenderType should auto-populate format + expect(columns[0].customRenderType).toBe("duration"); + expect(columns[0].format).toBe("duration"); + expect(columns[1].customRenderType).toBe("cost"); + expect(columns[1].format).toBe("cost"); + }); + + it("should not set format when column has no customRenderType", () => { + const ctx = createMetadataTestContext(); + const { columns } = printQuery("SELECT run_id FROM runs", ctx); + + expect(columns).toHaveLength(1); + expect(columns[0].format).toBeUndefined(); + expect(columns[0].customRenderType).toBeUndefined(); + }); + }); }); describe("Unknown column blocking", () => { diff --git a/internal-packages/tsql/src/query/printer.ts b/internal-packages/tsql/src/query/printer.ts index 8e5d27b56e..c5af9390c8 100644 --- a/internal-packages/tsql/src/query/printer.ts +++ b/internal-packages/tsql/src/query/printer.ts @@ -59,6 +59,7 @@ import { ClickHouseType, hasFieldMapping, getInternalValueFromMappingCaseInsensitive, + type ColumnFormatType, } from "./schema"; /** @@ -739,6 +740,14 @@ export class ClickHousePrinter { metadata.description = sourceColumn.description; } + // Set format hint from prettyFormat() or auto-populate from customRenderType + const sourceWithFormat = sourceColumn as (Partial & { format?: ColumnFormatType }) | null; + if (sourceWithFormat?.format) { + metadata.format = sourceWithFormat.format; + } else if (sourceColumn?.customRenderType) { + metadata.format = sourceColumn.customRenderType as ColumnFormatType; + } + this.outputColumns.push(metadata); } @@ -932,6 +941,53 @@ export class ClickHousePrinter { }; } + // Handle prettyFormat(expr, 'formatType') — metadata-only wrapper + if ((col as Call).expression_type === "call") { + const call = col as Call; + if (call.name.toLowerCase() === "prettyformat") { + if (call.args.length !== 2) { + throw new QueryError( + "prettyFormat() requires exactly 2 arguments: prettyFormat(expression, 'formatType')" + ); + } + const formatArg = call.args[1]; + if ( + (formatArg as Constant).expression_type !== "constant" || + typeof (formatArg as Constant).value !== "string" + ) { + throw new QueryError( + "prettyFormat() second argument must be a string literal format type" + ); + } + const formatType = (formatArg as Constant).value as string; + const validFormats = [ + "bytes", + "decimalBytes", + "quantity", + "percent", + "duration", + "durationSeconds", + "costInDollars", + "cost", + ]; + if (!validFormats.includes(formatType)) { + throw new QueryError( + `Unknown format type '${formatType}'. Valid types: ${validFormats.join(", ")}` + ); + } + const innerAnalysis = this.analyzeSelectColumn(call.args[0]); + return { + outputName: innerAnalysis.outputName, + sourceColumn: { + ...(innerAnalysis.sourceColumn ?? {}), + type: innerAnalysis.sourceColumn?.type ?? innerAnalysis.inferredType ?? undefined, + format: formatType as ColumnFormatType, + } as Partial & { format?: ColumnFormatType }, + inferredType: innerAnalysis.inferredType, + }; + } + } + // Handle Call (function/aggregation) - infer type from function if ((col as Call).expression_type === "call") { const call = col as Call; @@ -2797,6 +2853,14 @@ export class ClickHousePrinter { private visitCall(node: Call): string { const name = node.name; + // Handle prettyFormat() — strip wrapper, only emit the inner expression + if (name.toLowerCase() === "prettyformat") { + if (node.args.length !== 2) { + throw new QueryError("prettyFormat() requires exactly 2 arguments"); + } + return this.visit(node.args[0]); + } + // Handle timeBucket() - special TSQL function for automatic time bucketing if (name.toLowerCase() === "timebucket") { return this.visitTimeBucket(node); diff --git a/internal-packages/tsql/src/query/schema.ts b/internal-packages/tsql/src/query/schema.ts index 7886b48f69..16a607d8fc 100644 --- a/internal-packages/tsql/src/query/schema.ts +++ b/internal-packages/tsql/src/query/schema.ts @@ -270,6 +270,33 @@ export interface ColumnSchema { */ export type FieldMappings = Record>; +/** + * Display format types for column values. + * + * These tell the UI how to render values without changing the underlying data type. + * Includes both existing custom render types and new format hint types. + */ +export type ColumnFormatType = + // Existing custom render types + | "runId" + | "runStatus" + | "duration" + | "durationSeconds" + | "costInDollars" + | "cost" + | "machine" + | "environment" + | "environmentType" + | "project" + | "queue" + | "tags" + | "number" + // Format hint types (used by prettyFormat()) + | "bytes" + | "decimalBytes" + | "quantity" + | "percent"; + /** * Metadata for a column in query results. * @@ -291,6 +318,16 @@ export interface OutputColumnMetadata { * Only present for columns or virtual columns defined in the table schema. */ description?: string; + /** + * Display format hint — tells the UI how to render numeric values. + * + * Set by `prettyFormat(expr, 'formatType')` in TSQL queries. + * The underlying value remains numeric (for charts), but the UI uses this + * hint for axis labels, table cells, and tooltips. + * + * Also auto-populated from `customRenderType` when not explicitly set. + */ + format?: ColumnFormatType; } /** From 2964cf69d3c9405eab557d631d0228c1ec709ec9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Feb 2026 09:56:11 +0000 Subject: [PATCH 09/13] keep sending otel metrics in between runs, dev now acts more like prod, decouple flushing metrics with metric bucket intervals --- apps/webapp/app/env.server.ts | 2 + .../environmentVariablesRepository.server.ts | 14 ++ packages/cli-v3/src/dev/taskRunProcessPool.ts | 46 ++++++ packages/core/src/v3/otel/tracingSDK.ts | 17 +- packages/core/src/v3/taskContext/index.ts | 12 +- .../core/src/v3/taskContext/otelProcessors.ts | 145 ++++++++++++++++-- 6 files changed, 217 insertions(+), 19 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 25e0107078..efe68b81a9 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -384,6 +384,7 @@ const EnvironmentSchema = z DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"), DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"), DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(), + DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS: z.string().optional(), PROD_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"), PROD_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"), @@ -395,6 +396,7 @@ const EnvironmentSchema = z PROD_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"), PROD_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"), PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(), + PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS: z.string().optional(), TRIGGER_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"), TRIGGER_OTEL_LOG_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"), diff --git a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts index 43ad6e6415..7f1571fe09 100644 --- a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts +++ b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts @@ -976,6 +976,13 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment ); } + if (env.DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS) { + result.push({ + key: "TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS", + value: env.DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS, + }); + } + if (env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1") { result = result.concat([ { @@ -1120,6 +1127,13 @@ async function resolveBuiltInProdVariables( ); } + if (env.PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS) { + result.push({ + key: "TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS", + value: env.PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS, + }); + } + if (env.PROD_OTEL_BATCH_PROCESSING_ENABLED === "1") { result = result.concat([ { diff --git a/packages/cli-v3/src/dev/taskRunProcessPool.ts b/packages/cli-v3/src/dev/taskRunProcessPool.ts index a20755e1ce..810be7acb4 100644 --- a/packages/cli-v3/src/dev/taskRunProcessPool.ts +++ b/packages/cli-v3/src/dev/taskRunProcessPool.ts @@ -24,6 +24,8 @@ export class TaskRunProcessPool { private readonly maxExecutionsPerProcess: number; private readonly executionCountsPerProcess: Map = new Map(); private readonly deprecatedVersions: Set = new Set(); + private readonly idleTimers: Map = new Map(); + private static readonly IDLE_TIMEOUT_MS = 30_000; constructor(options: TaskRunProcessPoolOptions) { this.options = options; @@ -39,6 +41,7 @@ export class TaskRunProcessPool { const versionProcesses = this.availableProcessesByVersion.get(version) || []; const processesToKill = versionProcesses.filter((process) => !process.isExecuting()); + processesToKill.forEach((process) => this.clearIdleTimer(process)); Promise.all(processesToKill.map((process) => this.killProcess(process))).then(() => { this.availableProcessesByVersion.delete(version); }); @@ -72,6 +75,7 @@ export class TaskRunProcessPool { version, availableProcesses.filter((p) => p !== reusableProcess) ); + this.clearIdleTimer(reusableProcess); if (!this.busyProcessesByVersion.has(version)) { this.busyProcessesByVersion.set(version, new Set()); @@ -156,6 +160,7 @@ export class TaskRunProcessPool { this.availableProcessesByVersion.set(version, []); } this.availableProcessesByVersion.get(version)!.push(process); + this.startIdleTimer(process, version); } catch (error) { logger.debug("[TaskRunProcessPool] Failed to cleanup process for reuse, killing it", { error, @@ -215,7 +220,42 @@ export class TaskRunProcessPool { return process.isHealthy; } + private startIdleTimer(process: TaskRunProcess, version: string): void { + this.clearIdleTimer(process); + + const timer = setTimeout(() => { + // Synchronously remove from available pool before async kill to prevent race with getProcess() + const available = this.availableProcessesByVersion.get(version); + if (available) { + const index = available.indexOf(process); + if (index !== -1) { + available.splice(index, 1); + } + } + this.idleTimers.delete(process); + + logger.debug("[TaskRunProcessPool] Idle timeout reached, killing process", { + pid: process.pid, + version, + }); + + this.killProcess(process); + }, TaskRunProcessPool.IDLE_TIMEOUT_MS); + + this.idleTimers.set(process, timer); + } + + private clearIdleTimer(process: TaskRunProcess): void { + const timer = this.idleTimers.get(process); + if (timer) { + clearTimeout(timer); + this.idleTimers.delete(process); + } + } + private async killProcess(process: TaskRunProcess): Promise { + this.clearIdleTimer(process); + if (!process.isHealthy) { logger.debug("[TaskRunProcessPool] Process is not healthy, skipping cleanup", { processId: process.pid, @@ -247,6 +287,12 @@ export class TaskRunProcessPool { versions: Array.from(this.availableProcessesByVersion.keys()), }); + // Clear all idle timers + for (const timer of this.idleTimers.values()) { + clearTimeout(timer); + } + this.idleTimers.clear(); + // Kill all available processes across all versions const allAvailableProcesses = Array.from(this.availableProcessesByVersion.values()).flat(); await Promise.all(allAvailableProcesses.map((process) => this.killProcess(process))); diff --git a/packages/core/src/v3/otel/tracingSDK.ts b/packages/core/src/v3/otel/tracingSDK.ts index 1a9ca4c224..2f9d53a1f3 100644 --- a/packages/core/src/v3/otel/tracingSDK.ts +++ b/packages/core/src/v3/otel/tracingSDK.ts @@ -55,6 +55,7 @@ import { import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { taskContext } from "../task-context-api.js"; import { + BufferingMetricExporter, TaskContextLogProcessor, TaskContextMetricExporter, TaskContextSpanProcessor, @@ -285,14 +286,22 @@ export class TracingSDK { url: metricsUrl, timeoutMillis: config.forceFlushTimeoutMillis, }); - const metricExporter = new TaskContextMetricExporter(rawMetricExporter); + + const collectionIntervalMs = parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS") ?? "10000" + ); + const exportIntervalMs = parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS") ?? "30000" + ); + + // Chain: PeriodicReader(10s) → TaskContextMetricExporter → BufferingMetricExporter(30s) → OTLP + const bufferingExporter = new BufferingMetricExporter(rawMetricExporter, exportIntervalMs); + const metricExporter = new TaskContextMetricExporter(bufferingExporter); const metricReaders: MetricReader[] = [ new PeriodicExportingMetricReader({ exporter: metricExporter, - exportIntervalMillis: parseInt( - getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS") ?? "60000" - ), + exportIntervalMillis: collectionIntervalMs, exportTimeoutMillis: parseInt( getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS") ?? "30000" ), diff --git a/packages/core/src/v3/taskContext/index.ts b/packages/core/src/v3/taskContext/index.ts index b03b7ff4b6..f76671160a 100644 --- a/packages/core/src/v3/taskContext/index.ts +++ b/packages/core/src/v3/taskContext/index.ts @@ -1,13 +1,14 @@ import { Attributes } from "@opentelemetry/api"; import { ServerBackgroundWorker, TaskRunContext } from "../schemas/index.js"; import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; -import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js"; +import { getGlobal, registerGlobal } from "../utils/globals.js"; import { TaskContext } from "./types.js"; const API_NAME = "task-context"; export class TaskContextAPI { private static _instance?: TaskContextAPI; + private _runDisabled = false; private constructor() {} @@ -23,6 +24,10 @@ export class TaskContextAPI { return this.#getTaskContext() !== undefined; } + get isRunDisabled(): boolean { + return this._runDisabled; + } + get ctx(): TaskRunContext | undefined { return this.#getTaskContext()?.ctx; } @@ -98,11 +103,12 @@ export class TaskContextAPI { } public disable() { - unregisterGlobal(API_NAME); + this._runDisabled = true; } public setGlobalTaskContext(taskContext: TaskContext): boolean { - return registerGlobal(API_NAME, taskContext); + this._runDisabled = false; + return registerGlobal(API_NAME, taskContext, true); } #getTaskContext(): TaskContext | undefined { diff --git a/packages/core/src/v3/taskContext/otelProcessors.ts b/packages/core/src/v3/taskContext/otelProcessors.ts index 9d051f615b..b08e759724 100644 --- a/packages/core/src/v3/taskContext/otelProcessors.ts +++ b/packages/core/src/v3/taskContext/otelProcessors.ts @@ -8,6 +8,7 @@ import type { MetricData, PushMetricExporter, ResourceMetrics, + ScopeMetrics, } from "@opentelemetry/sdk-metrics"; import { Span, SpanProcessor } from "@opentelemetry/sdk-trace-base"; import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; @@ -130,29 +131,44 @@ export class TaskContextMetricExporter implements PushMetricExporter { export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { if (!taskContext.ctx) { - // No active run — drop metrics (between-run noise) + // No context at all — drop metrics resultCallback({ code: ExportResultCode.SUCCESS }); return; } const ctx = taskContext.ctx; - const contextAttrs: Attributes = { - [SemanticInternalAttributes.RUN_ID]: ctx.run.id, - [SemanticInternalAttributes.TASK_SLUG]: ctx.task.id, - [SemanticInternalAttributes.ATTEMPT_NUMBER]: ctx.attempt.number, - [SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id, - [SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id, - [SemanticInternalAttributes.PROJECT_ID]: ctx.project.id, - [SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name, - [SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type, - }; + + let contextAttrs: Attributes; + + if (taskContext.isRunDisabled) { + // Between runs: keep environment/project/org/machine attrs, strip run-specific ones + contextAttrs = { + [SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id, + [SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type, + [SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id, + [SemanticInternalAttributes.PROJECT_ID]: ctx.project.id, + [SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name, + }; + } else { + // During a run: full context attrs + contextAttrs = { + [SemanticInternalAttributes.RUN_ID]: ctx.run.id, + [SemanticInternalAttributes.TASK_SLUG]: ctx.task.id, + [SemanticInternalAttributes.ATTEMPT_NUMBER]: ctx.attempt.number, + [SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id, + [SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id, + [SemanticInternalAttributes.PROJECT_ID]: ctx.project.id, + [SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name, + [SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type, + }; + } if (taskContext.worker) { contextAttrs[SemanticInternalAttributes.WORKER_ID] = taskContext.worker.id; contextAttrs[SemanticInternalAttributes.WORKER_VERSION] = taskContext.worker.version; } - if (ctx.run.tags?.length) { + if (!taskContext.isRunDisabled && ctx.run.tags?.length) { contextAttrs[SemanticInternalAttributes.RUN_TAGS] = ctx.run.tags; } @@ -184,3 +200,108 @@ export class TaskContextMetricExporter implements PushMetricExporter { return this._innerExporter.shutdown(); } } + +export class BufferingMetricExporter implements PushMetricExporter { + selectAggregationTemporality?: (instrumentType: InstrumentType) => AggregationTemporality; + selectAggregation?: (instrumentType: InstrumentType) => AggregationOption; + + private _buffer: ResourceMetrics[] = []; + private _lastFlushTime = Date.now(); + + constructor( + private _innerExporter: PushMetricExporter, + private _flushIntervalMs: number + ) { + if (_innerExporter.selectAggregationTemporality) { + this.selectAggregationTemporality = + _innerExporter.selectAggregationTemporality.bind(_innerExporter); + } + if (_innerExporter.selectAggregation) { + this.selectAggregation = _innerExporter.selectAggregation.bind(_innerExporter); + } + } + + export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { + this._buffer.push(metrics); + + const now = Date.now(); + if (now - this._lastFlushTime >= this._flushIntervalMs) { + this._lastFlushTime = now; + const merged = this._mergeBuffer(); + this._innerExporter.export(merged, resultCallback); + } else { + resultCallback({ code: ExportResultCode.SUCCESS }); + } + } + + forceFlush(): Promise { + if (this._buffer.length > 0) { + this._lastFlushTime = Date.now(); + const merged = this._mergeBuffer(); + return new Promise((resolve, reject) => { + this._innerExporter.export(merged, (result) => { + if (result.code === ExportResultCode.SUCCESS) { + resolve(); + } else { + reject(result.error ?? new Error("Export failed")); + } + }); + }).then(() => this._innerExporter.forceFlush()); + } + return this._innerExporter.forceFlush(); + } + + shutdown(): Promise { + return this.forceFlush().then(() => this._innerExporter.shutdown()); + } + + private _mergeBuffer(): ResourceMetrics { + const batch = this._buffer; + this._buffer = []; + + if (batch.length === 1) { + return batch[0]!; + } + + const base = batch[0]!; + + // Merge all scopeMetrics by scope name, then metrics by descriptor name + const scopeMap = new Map }>(); + + for (const rm of batch) { + for (const sm of rm.scopeMetrics) { + const scopeKey = sm.scope.name; + let scopeEntry = scopeMap.get(scopeKey); + if (!scopeEntry) { + scopeEntry = { scope: sm.scope, metricsMap: new Map() }; + scopeMap.set(scopeKey, scopeEntry); + } + + for (const metric of sm.metrics) { + const metricKey = metric.descriptor.name; + const existing = scopeEntry.metricsMap.get(metricKey); + if (existing) { + // Append data points from this collection to the existing metric + scopeEntry.metricsMap.set(metricKey, { + ...existing, + dataPoints: [...existing.dataPoints, ...metric.dataPoints], + } as MetricData); + } else { + scopeEntry.metricsMap.set(metricKey, { + ...metric, + dataPoints: [...metric.dataPoints], + } as MetricData); + } + } + } + } + + return { + resource: base.resource, + scopeMetrics: Array.from(scopeMap.values()).map(({ scope, metricsMap }) => ({ + scope, + metrics: Array.from(metricsMap.values()), + })), + }; + } +} From 6b7b235841c07223ebac4de153da5d938e0e6ef0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Feb 2026 10:10:42 +0000 Subject: [PATCH 10/13] Add custom metrics examples and provide otel.metrics --- packages/trigger-sdk/src/v3/index.ts | 2 - packages/trigger-sdk/src/v3/otel.ts | 2 + references/hello-world/src/trigger/metrics.ts | 83 ++++++++++++++++++- 3 files changed, 84 insertions(+), 3 deletions(-) diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index 97a2271f36..b2d6247699 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -58,5 +58,3 @@ export * as queues from "./queues.js"; export type { ImportEnvironmentVariablesParams } from "./envvars.js"; export { configure, auth } from "./auth.js"; - -export { metrics } from "@opentelemetry/api"; diff --git a/packages/trigger-sdk/src/v3/otel.ts b/packages/trigger-sdk/src/v3/otel.ts index e80c77562e..d02220208e 100644 --- a/packages/trigger-sdk/src/v3/otel.ts +++ b/packages/trigger-sdk/src/v3/otel.ts @@ -1,7 +1,9 @@ +import { metrics } from "@opentelemetry/api"; import { traceContext } from "@trigger.dev/core/v3"; export const otel = { withExternalTrace: (fn: () => T): T => { return traceContext.withExternalTrace(fn); }, + metrics, }; diff --git a/references/hello-world/src/trigger/metrics.ts b/references/hello-world/src/trigger/metrics.ts index 14b0785500..fa398186b0 100644 --- a/references/hello-world/src/trigger/metrics.ts +++ b/references/hello-world/src/trigger/metrics.ts @@ -1,7 +1,22 @@ -import { batch, logger, task } from "@trigger.dev/sdk"; +import { batch, logger, otel, task } from "@trigger.dev/sdk"; import { createHash } from "node:crypto"; import { setTimeout } from "node:timers/promises"; +// Custom metrics — instruments are created once at module level +const meter = otel.metrics.getMeter("hello-world"); +const itemsProcessedCounter = meter.createCounter("items.processed", { + description: "Total number of items processed", + unit: "items", +}); +const itemDurationHistogram = meter.createHistogram("item.duration", { + description: "Time spent processing each item", + unit: "ms", +}); +const queueDepthGauge = meter.createUpDownCounter("queue.depth", { + description: "Current simulated queue depth", + unit: "items", +}); + /** * Tight computational loop that produces sustained high CPU utilization. * Uses repeated SHA-256 hashing to keep the CPU busy. @@ -243,3 +258,69 @@ export const concurrentLoad = task({ return { childRunIds: results.runs.map((r) => r.id) }; }, }); + +/** + * Demonstrates custom OTEL metrics: counter, histogram, and up-down counter. + * Simulates processing a queue of items with varying processing times. + */ +export const customMetrics = task({ + id: "custom-metrics", + run: async ( + { + itemCount = 20, + minProcessingMs = 50, + maxProcessingMs = 500, + batchSize = 5, + }: { + itemCount?: number; + minProcessingMs?: number; + maxProcessingMs?: number; + batchSize?: number; + }, + { ctx } + ) => { + logger.info("Starting custom metrics demo", { itemCount, batchSize }); + + // Simulate items arriving in the queue + queueDepthGauge.add(itemCount); + + let totalProcessed = 0; + + for (let i = 0; i < itemCount; i += batchSize) { + const currentBatch = Math.min(batchSize, itemCount - i); + + for (let j = 0; j < currentBatch; j++) { + const processingTime = + minProcessingMs + Math.random() * (maxProcessingMs - minProcessingMs); + + // Simulate work + const start = performance.now(); + let data = Buffer.from(`item-${i + j}`); + const deadline = Date.now() + processingTime; + while (Date.now() < deadline) { + data = createHash("sha256").update(data).digest(); + } + const elapsed = performance.now() - start; + + // Record metrics + itemsProcessedCounter.add(1, { "item.type": j % 2 === 0 ? "even" : "odd" }); + itemDurationHistogram.record(elapsed, { "item.type": j % 2 === 0 ? "even" : "odd" }); + queueDepthGauge.add(-1); + + totalProcessed++; + } + + logger.info(`Processed batch`, { + batchNumber: Math.floor(i / batchSize) + 1, + totalProcessed, + remaining: itemCount - totalProcessed, + }); + + // Brief pause between batches + await setTimeout(1000); + } + + logger.info("Custom metrics demo complete", { totalProcessed }); + return { totalProcessed }; + }, +}); From 304fec6c8b806964e5bacb913f6fbb00db004216 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Feb 2026 15:47:01 +0000 Subject: [PATCH 11/13] Add some nodejs metrics --- .../cli-v3/src/entryPoints/dev-run-worker.ts | 1 + .../src/entryPoints/managed-run-worker.ts | 1 + .../core/src/v3/otel/nodejsRuntimeMetrics.ts | 64 +++++++++++++++++++ packages/core/src/v3/otel/tracingSDK.ts | 7 ++ 4 files changed, 73 insertions(+) create mode 100644 packages/core/src/v3/otel/nodejsRuntimeMetrics.ts diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index a7f80b7a99..c1b453eed9 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -213,6 +213,7 @@ async function doBootstrap() { forceFlushTimeoutMillis: 30_000, resource: config.telemetry?.resource, hostMetrics: true, + nodejsRuntimeMetrics: true, // Drop all system metrics from dev metrics export droppedMetrics: ["system.*"], }); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 7db9571cef..22eb34615e 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -192,6 +192,7 @@ async function doBootstrap() { metricReaders: config.telemetry?.metricReaders ?? [], resource: config.telemetry?.resource, hostMetrics: true, + nodejsRuntimeMetrics: true, }); const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); diff --git a/packages/core/src/v3/otel/nodejsRuntimeMetrics.ts b/packages/core/src/v3/otel/nodejsRuntimeMetrics.ts new file mode 100644 index 0000000000..d12eb94301 --- /dev/null +++ b/packages/core/src/v3/otel/nodejsRuntimeMetrics.ts @@ -0,0 +1,64 @@ +import { type MeterProvider } from "@opentelemetry/sdk-metrics"; +import { performance, monitorEventLoopDelay } from "node:perf_hooks"; + +export function startNodejsRuntimeMetrics(meterProvider: MeterProvider) { + const meter = meterProvider.getMeter("nodejs-runtime", "1.0.0"); + + // Event loop utilization (diff between collection intervals) + let lastElu = performance.eventLoopUtilization(); + + const eluGauge = meter.createObservableGauge("nodejs.event_loop.utilization", { + description: "Event loop utilization over the last collection interval", + unit: "1", + }); + + // Event loop delay histogram (from perf_hooks) + const eld = monitorEventLoopDelay({ resolution: 20 }); + eld.enable(); + + const eldP50 = meter.createObservableGauge("nodejs.event_loop.delay.p50", { + description: "Median event loop delay", + unit: "s", + }); + const eldP99 = meter.createObservableGauge("nodejs.event_loop.delay.p99", { + description: "p99 event loop delay", + unit: "s", + }); + const eldMax = meter.createObservableGauge("nodejs.event_loop.delay.max", { + description: "Max event loop delay", + unit: "s", + }); + + // Heap metrics + const heapUsed = meter.createObservableGauge("nodejs.heap.used", { + description: "V8 heap used", + unit: "By", + }); + const heapTotal = meter.createObservableGauge("nodejs.heap.total", { + description: "V8 heap total allocated", + unit: "By", + }); + + // Single batch callback for all metrics + meter.addBatchObservableCallback( + (obs) => { + // ELU + const currentElu = performance.eventLoopUtilization(); + const diff = performance.eventLoopUtilization(currentElu, lastElu); + lastElu = currentElu; + obs.observe(eluGauge, diff.utilization); + + // Event loop delay (nanoseconds -> seconds) + obs.observe(eldP50, eld.percentile(50) / 1e9); + obs.observe(eldP99, eld.percentile(99) / 1e9); + obs.observe(eldMax, eld.max / 1e9); + eld.reset(); + + // Heap + const mem = process.memoryUsage(); + obs.observe(heapUsed, mem.heapUsed); + obs.observe(heapTotal, mem.heapTotal); + }, + [eluGauge, eldP50, eldP99, eldMax, heapUsed, heapTotal] + ); +} diff --git a/packages/core/src/v3/otel/tracingSDK.ts b/packages/core/src/v3/otel/tracingSDK.ts index 2f9d53a1f3..7ee8f88907 100644 --- a/packages/core/src/v3/otel/tracingSDK.ts +++ b/packages/core/src/v3/otel/tracingSDK.ts @@ -63,6 +63,7 @@ import { import { traceContext } from "../trace-context-api.js"; import { getEnvVar } from "../utils/getEnv.js"; import { machineId } from "./machineId.js"; +import { startNodejsRuntimeMetrics } from "./nodejsRuntimeMetrics.js"; export type TracingDiagnosticLogLevel = | "none" @@ -84,6 +85,8 @@ export type TracingSDKConfig = { diagLogLevel?: TracingDiagnosticLogLevel; resource?: Resource; hostMetrics?: boolean; + /** Enable Node.js runtime metrics (event loop utilization, heap usage, etc.) */ + nodejsRuntimeMetrics?: boolean; /** Metric instrument name patterns to drop (supports wildcards, e.g. "system.cpu.*") */ droppedMetrics?: string[]; }; @@ -326,6 +329,10 @@ export class TracingSDK { hostMetrics.start(); } + if (config.nodejsRuntimeMetrics) { + startNodejsRuntimeMetrics(meterProvider); + } + this.getLogger = loggerProvider.getLogger.bind(loggerProvider); this.getTracer = traceProvider.getTracer.bind(traceProvider); } From 82a6a23eb1651c49ea7c430c7ccedf0d01719bbe Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Feb 2026 16:50:59 +0000 Subject: [PATCH 12/13] fix typecheck issues --- apps/webapp/app/v3/otlpExporter.server.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index 7093a494f5..cd56eaabce 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -489,8 +489,8 @@ function convertMetricsToClickhouseRows( // Process gauge data points if (metric.gauge) { for (const dp of metric.gauge.dataPoints) { - const value = - dp.asDouble !== 0 ? dp.asDouble : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; + const value: number = + (dp.asDouble ?? 0) !== 0 ? dp.asDouble! : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); rows.push({ @@ -514,8 +514,8 @@ function convertMetricsToClickhouseRows( // Process sum data points if (metric.sum) { for (const dp of metric.sum.dataPoints) { - const value = - dp.asDouble !== 0 ? dp.asDouble : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; + const value: number = + (dp.asDouble ?? 0) !== 0 ? dp.asDouble! : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0; const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx); rows.push({ From aa450a6ecc2fe88001c12d49f001f0c55cf65a3c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Feb 2026 17:33:43 +0000 Subject: [PATCH 13/13] always ensure valid values for exportIntervalMillis and exportTimeoutMillis. --- packages/core/src/v3/otel/tracingSDK.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/core/src/v3/otel/tracingSDK.ts b/packages/core/src/v3/otel/tracingSDK.ts index 7ee8f88907..ab775e2144 100644 --- a/packages/core/src/v3/otel/tracingSDK.ts +++ b/packages/core/src/v3/otel/tracingSDK.ts @@ -301,13 +301,15 @@ export class TracingSDK { const bufferingExporter = new BufferingMetricExporter(rawMetricExporter, exportIntervalMs); const metricExporter = new TaskContextMetricExporter(bufferingExporter); + const exportTimeoutMillis = parseInt( + getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS") ?? "30000" + ); + const metricReaders: MetricReader[] = [ new PeriodicExportingMetricReader({ exporter: metricExporter, - exportIntervalMillis: collectionIntervalMs, - exportTimeoutMillis: parseInt( - getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS") ?? "30000" - ), + exportIntervalMillis: Math.max(collectionIntervalMs, exportTimeoutMillis), + exportTimeoutMillis, }), ...(config.metricReaders ?? []), ];