@@ -3,9 +3,11 @@ import {
33 type WorkflowEvent ,
44 type WorkflowStep ,
55} from "cloudflare:workers" ;
6+ import pAll from "p-all" ;
67import { createCloudflareApi } from "./api/cloudflare" ;
78import { createDatadogApi } from "./api/datadog" ;
89import { formatHealthMetrics , formatMetricsForContainer } from "./metrics" ;
10+ import { chunk } from "./utils" ;
911
1012/**
1113 * Calculate the metrics time window.
@@ -40,6 +42,11 @@ export class MetricsExporterWorkflow extends WorkflowEntrypoint<Env> {
4042 this . env . CLOUDFLARE_API_TOKEN ,
4143 ) ;
4244
45+ const datadog = createDatadogApi (
46+ this . env . DATADOG_API_KEY ,
47+ this . env . DATADOG_SITE ,
48+ ) ;
49+
4350 const { start, end } = getMetricsTimeWindow ( ) ;
4451 console . log ( "Workflow started" , {
4552 start : start . toISOString ( ) ,
@@ -57,10 +64,6 @@ export class MetricsExporterWorkflow extends WorkflowEntrypoint<Env> {
5764 this . env . CLOUDFLARE_ACCOUNT_ID ,
5865 result ,
5966 ) ;
60- const datadog = createDatadogApi (
61- this . env . DATADOG_API_KEY ,
62- this . env . DATADOG_SITE ,
63- ) ;
6467 await datadog . sendMetrics ( healthMetrics ) ;
6568
6669 return result . map ( ( c ) => ( { id : c . id , name : c . name } ) ) ;
@@ -86,18 +89,25 @@ export class MetricsExporterWorkflow extends WorkflowEntrypoint<Env> {
8689 metricsGroups ,
8790 ) ;
8891
89- if ( metrics . length > 0 ) {
90- const datadog = createDatadogApi (
91- this . env . DATADOG_API_KEY ,
92- this . env . DATADOG_SITE ,
93- ) ;
94- await datadog . sendMetrics ( metrics ) ;
95- }
92+ const batches = chunk ( metrics , 25000 ) ;
93+
94+ await pAll (
95+ batches . map (
96+ ( batch , i ) => ( ) =>
97+ step . do (
98+ `send batch ${ i + 1 } /${ batches . length } (${ batch . length } metrics)` ,
99+ STEP_CONFIG ,
100+ async ( ) => {
101+ await datadog . sendMetrics ( batch ) ;
102+ } ,
103+ ) ,
104+ ) ,
105+ { concurrency : 6 } ,
106+ ) ;
96107
97108 return metrics . length ;
98109 } ,
99110 ) ;
100-
101111 totalMetrics += count ;
102112 }
103113
0 commit comments