From c63c08abb4610e25614d700e6cb5db4576f3deb9 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Thu, 18 Dec 2025 17:37:04 +0100 Subject: [PATCH 1/2] tests(telemetry): flush metrics with heartbeats if the interval is small # Motivation Tests usually set the telemetry hearbeat interval to a small value to not have to wait to get data. For these tests also want to get telemetry metrics fast. This PR changes the metics flush interval to the heartbeat value if the hearbeat interval is set to a shorter duration than the default metrics flush interval. --- libdd-telemetry/src/worker/mod.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 6e021f15bf..2cfb832ecc 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -13,7 +13,6 @@ use crate::{ use libdd_common::Endpoint; use libdd_common::{hyper_migration, tag::Tag, worker::Worker}; -use std::fmt::Debug; use std::iter::Sum; use std::ops::Add; use std::{ @@ -26,6 +25,7 @@ use std::{ }, time, }; +use std::{fmt::Debug, time::Duration}; use crate::metrics::MetricBucketStats; use futures::{ @@ -135,6 +135,7 @@ pub struct TelemetryWorker { seq_id: AtomicU64, runtime_id: String, client: Box, + metrics_flush_interval: Duration, deadlines: scheduler::Scheduler, data: TelemetryWorkerData, } @@ -595,7 +596,7 @@ impl TelemetryWorker { }, common: context.common, _type: context.metric_type, - interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(), + interval: self.metrics_flush_interval.as_secs(), }); } data::Distributions { series } @@ -619,7 +620,7 @@ impl TelemetryWorker { points, common: context.common, _type: context.metric_type, - interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(), + interval: self.metrics_flush_interval.as_secs(), }); } @@ -1087,6 +1088,9 @@ impl TelemetryWorkerBuilder { let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval; let client = http_client::from_config(&config); + let metrics_flush_interval = + telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL); + #[allow(clippy::unwrap_used)] let worker = TelemetryWorker { flavor: self.flavor, @@ -1108,11 +1112,9 @@ impl TelemetryWorkerBuilder { .runtime_id .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), client, + metrics_flush_interval, deadlines: scheduler::Scheduler::new(vec![ - ( - MetricBuckets::METRICS_FLUSH_INTERVAL, - LifecycleAction::FlushMetricAggr, - ), + (metrics_flush_interval, LifecycleAction::FlushMetricAggr), (telemetry_heartbeat_interval, LifecycleAction::FlushData), ( time::Duration::from_secs(60 * 60 * 24), From ef9d964b302cd083065eac43dd0f36f9bbd5a2de Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 26 Dec 2025 17:00:42 +0100 Subject: [PATCH 2/2] fix: add field to debug print --- libdd-telemetry/src/worker/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 2cfb832ecc..8c3b3fad06 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -148,6 +148,7 @@ impl Debug for TelemetryWorker { .field("cancellation_token", &self.cancellation_token) .field("seq_id", &self.seq_id) .field("runtime_id", &self.runtime_id) + .field("metrics_flush_interval", &self.metrics_flush_interval) .field("deadlines", &self.deadlines) .field("data", &self.data) .finish()