diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 6e021f15bf..8c3b3fad06 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -13,7 +13,6 @@ use crate::{ use libdd_common::Endpoint; use libdd_common::{hyper_migration, tag::Tag, worker::Worker}; -use std::fmt::Debug; use std::iter::Sum; use std::ops::Add; use std::{ @@ -26,6 +25,7 @@ use std::{ }, time, }; +use std::{fmt::Debug, time::Duration}; use crate::metrics::MetricBucketStats; use futures::{ @@ -135,6 +135,7 @@ pub struct TelemetryWorker { seq_id: AtomicU64, runtime_id: String, client: Box, + metrics_flush_interval: Duration, deadlines: scheduler::Scheduler, data: TelemetryWorkerData, } @@ -147,6 +148,7 @@ impl Debug for TelemetryWorker { .field("cancellation_token", &self.cancellation_token) .field("seq_id", &self.seq_id) .field("runtime_id", &self.runtime_id) + .field("metrics_flush_interval", &self.metrics_flush_interval) .field("deadlines", &self.deadlines) .field("data", &self.data) .finish() @@ -595,7 +597,7 @@ impl TelemetryWorker { }, common: context.common, _type: context.metric_type, - interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(), + interval: self.metrics_flush_interval.as_secs(), }); } data::Distributions { series } @@ -619,7 +621,7 @@ impl TelemetryWorker { points, common: context.common, _type: context.metric_type, - interval: MetricBuckets::METRICS_FLUSH_INTERVAL.as_secs(), + interval: self.metrics_flush_interval.as_secs(), }); } @@ -1087,6 +1089,9 @@ impl TelemetryWorkerBuilder { let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval; let client = http_client::from_config(&config); + let metrics_flush_interval = + telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL); + #[allow(clippy::unwrap_used)] let worker = TelemetryWorker { flavor: self.flavor, @@ -1108,11 +1113,9 @@ impl TelemetryWorkerBuilder { .runtime_id .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), client, + metrics_flush_interval, deadlines: scheduler::Scheduler::new(vec![ - ( - MetricBuckets::METRICS_FLUSH_INTERVAL, - LifecycleAction::FlushMetricAggr, - ), + (metrics_flush_interval, LifecycleAction::FlushMetricAggr), (telemetry_heartbeat_interval, LifecycleAction::FlushData), ( time::Duration::from_secs(60 * 60 * 24),