Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import datadog.trace.api.sampling.Sampler;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.DoubleFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,19 +18,15 @@ public class ProbeRateLimiter {
private static final Duration TEN_SECONDS_WINDOW = Duration.of(10, ChronoUnit.SECONDS);
private static final double DEFAULT_GLOBAL_SNAPSHOT_RATE = DEFAULT_SNAPSHOT_RATE * 100;
private static final double DEFAULT_GLOBAL_LOG_RATE = 5000.0;
private static final ConcurrentMap<String, RateLimitInfo> PROBE_SAMPLERS =
new ConcurrentHashMap<>();
private static Sampler GLOBAL_SNAPSHOT_SAMPLER = createSampler(DEFAULT_GLOBAL_SNAPSHOT_RATE);
private static Sampler GLOBAL_LOG_SAMPLER = createSampler(DEFAULT_GLOBAL_LOG_RATE);
private static DoubleFunction<Sampler> samplerSupplier = ProbeRateLimiter::createSampler;
private static Sampler GLOBAL_SNAPSHOT_SAMPLER =
defaultCreateSampler(DEFAULT_GLOBAL_SNAPSHOT_RATE);
private static Sampler GLOBAL_LOG_SAMPLER = defaultCreateSampler(DEFAULT_GLOBAL_LOG_RATE);
private static DoubleFunction<Sampler> samplerSupplier = ProbeRateLimiter::defaultCreateSampler;

public static boolean tryProbe(String probeId) {
RateLimitInfo rateLimitInfo =
PROBE_SAMPLERS.computeIfAbsent(probeId, ProbeRateLimiter::getDefaultRateLimitInfo);
Sampler globalSampler =
rateLimitInfo.isCaptureSnapshot ? GLOBAL_SNAPSHOT_SAMPLER : GLOBAL_LOG_SAMPLER;
public static boolean tryProbe(Sampler sampler, boolean useGlobalLowRate) {
Sampler globalSampler = useGlobalLowRate ? GLOBAL_SNAPSHOT_SAMPLER : GLOBAL_LOG_SAMPLER;
if (globalSampler.sample()) {
return rateLimitInfo.sampler.sample();
return sampler.sample();
}
return false;
}
Expand All @@ -42,8 +36,8 @@ private static RateLimitInfo getDefaultRateLimitInfo(String probeId) {
return new RateLimitInfo(samplerSupplier.apply(DEFAULT_SNAPSHOT_RATE), true);
}

public static void setRate(String probeId, double rate, boolean isCaptureSnapshot) {
PROBE_SAMPLERS.put(probeId, new RateLimitInfo(samplerSupplier.apply(rate), isCaptureSnapshot));
public static Sampler createSampler(double rate) {
return samplerSupplier.apply(rate);
}

public static void setGlobalSnapshotRate(double rate) {
Expand All @@ -54,25 +48,16 @@ public static void setGlobalLogRate(double rate) {
GLOBAL_LOG_SAMPLER = samplerSupplier.apply(rate);
}

public static void resetRate(String probeId) {
PROBE_SAMPLERS.remove(probeId);
}

public static void resetGlobalRate() {
setGlobalSnapshotRate(DEFAULT_GLOBAL_LOG_RATE);
}

public static void resetAll() {
PROBE_SAMPLERS.clear();
resetGlobalRate();
}

public static void setSamplerSupplier(DoubleFunction<Sampler> samplerSupplier) {
ProbeRateLimiter.samplerSupplier =
samplerSupplier != null ? samplerSupplier : ProbeRateLimiter::createSampler;
samplerSupplier != null ? samplerSupplier : ProbeRateLimiter::defaultCreateSampler;
}

private static Sampler createSampler(double rate) {
private static Sampler defaultCreateSampler(double rate) {
if (rate < 0) {
return new ConstantSampler(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.datadog.debugger.probe.LogProbe;
import com.datadog.debugger.probe.ProbeDefinition;
import com.datadog.debugger.probe.Sampled;
import com.datadog.debugger.probe.Sampling;
import com.datadog.debugger.sink.DebuggerSink;
import com.datadog.debugger.util.ExceptionHelper;
import datadog.trace.api.Config;
Expand Down Expand Up @@ -136,8 +135,8 @@ private void applyNewConfiguration(Configuration newConfiguration) {
new ConfigurationComparer(
originalConfiguration, newConfiguration, instrumentationResults);
if (changes.hasRateLimitRelatedChanged()) {
// apply rate limit config first to avoid racing with execution/instrumentation of log
// probes
// apply rate limit config first to avoid racing with execution/instrumentation
// of probes requiring samplers
applyRateLimiter(changes, newConfiguration.getSampling());
}
currentConfiguration = newConfiguration;
Expand Down Expand Up @@ -282,18 +281,7 @@ private static void applyRateLimiter(
for (ProbeDefinition added : changes.getAddedDefinitions()) {
if (added instanceof Sampled) {
Sampled probe = (Sampled) added;
Sampling sampling = probe.getSampling();
double rate = getDefaultRateLimitPerProbe(probe);
if (sampling != null && sampling.getEventsPerSecond() != 0) {
rate = sampling.getEventsPerSecond();
}
ProbeRateLimiter.setRate(probe.getId(), rate, probe.isCaptureSnapshot());
}
}
// remove rate for all removed probes
for (ProbeDefinition removedDefinition : changes.getRemovedDefinitions()) {
if (removedDefinition instanceof LogProbe) {
ProbeRateLimiter.resetRate(removedDefinition.getId());
probe.initSamplers();
}
}
// set global sampling
Expand All @@ -302,12 +290,6 @@ private static void applyRateLimiter(
}
}

private static double getDefaultRateLimitPerProbe(Sampled probe) {
return probe.isCaptureSnapshot()
? ProbeRateLimiter.DEFAULT_SNAPSHOT_RATE
: ProbeRateLimiter.DEFAULT_LOG_RATE;
}

private void removeCurrentTransformer() {
if (currentTransformer == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ private void createMethodProbe(MethodInfo methodInfo, List<ProbeDefinition> prob
.where(methodInfo.getClassNode().name, methodInfo.getMethodNode().name)
.captureSnapshot(false)
.build();
probe.initSamplers();
probes.add(probe);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public ExceptionProbe(
null);
this.exceptionProbeManager = exceptionProbeManager;
this.chainedExceptionIdx = chainedExceptionIdx;
initSamplers();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import datadog.trace.api.Config;
import datadog.trace.api.CorrelationIdentifier;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.sampling.Sampler;
import datadog.trace.bootstrap.debugger.CapturedContext;
import datadog.trace.bootstrap.debugger.CapturedContextProbe;
import datadog.trace.bootstrap.debugger.DebuggerContext;
Expand Down Expand Up @@ -323,6 +324,7 @@ public String toString() {
private transient Consumer<Snapshot> snapshotProcessor;
protected transient Map<DDTraceId, AtomicInteger> budget =
Collections.synchronizedMap(new WeakIdentityHashMap<>());
protected transient Sampler sampler;

// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
// constructors, including field initializers.
Expand Down Expand Up @@ -408,6 +410,7 @@ public LogProbe(LogProbe.Builder builder) {
builder.sampling,
builder.captureExpressions);
this.snapshotProcessor = builder.snapshotProcessor;
initSamplers();
}

public LogProbe copy() {
Expand Down Expand Up @@ -450,6 +453,16 @@ public Sampling getSampling() {
return sampling;
}

public void initSamplers() {
double rate =
sampling != null
? sampling.getEventsPerSecond()
: (isCaptureSnapshot()
? ProbeRateLimiter.DEFAULT_SNAPSHOT_RATE
: ProbeRateLimiter.DEFAULT_LOG_RATE);
sampler = ProbeRateLimiter.createSampler(rate);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be called when we receive a new config version for this probe from RC right? Does it re-init the sampler on any update even to unrelated fields? Does that mess with the sampling in any undesired way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be called on every new version of the probe. It re-inits the sampler. this was already the case before.
The sampling is done with time window that is already short (1s).


public List<CaptureExpression> getCaptureExpressions() {
return captureExpressions;
}
Expand Down Expand Up @@ -487,7 +500,7 @@ public InstrumentationResult.Status instrument(
public boolean isReadyToCapture() {
if (!hasCondition()) {
// we are sampling here to avoid creating CapturedContext when the sampling result is negative
return ProbeRateLimiter.tryProbe(id);
return ProbeRateLimiter.tryProbe(sampler, isCaptureSnapshot());
}
return true;
}
Expand Down Expand Up @@ -553,7 +566,8 @@ private void sample(LogStatus logStatus, MethodLocation methodLocation) {
return;
}
boolean sampled =
!logStatus.getDebugSessionStatus().isDisabled() && ProbeRateLimiter.tryProbe(id);
!logStatus.getDebugSessionStatus().isDisabled()
&& ProbeRateLimiter.tryProbe(sampler, isCaptureSnapshot());
logStatus.setSampled(sampled);
if (!sampled) {
DebuggerAgent.getSink()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.datadog.debugger.probe;

public interface Sampled {
Sampling getSampling();

String getId();

boolean isCaptureSnapshot();
void initSamplers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.datadog.debugger.instrumentation.MethodInfo;
import com.datadog.debugger.sink.Snapshot;
import datadog.trace.api.Pair;
import datadog.trace.api.sampling.Sampler;
import datadog.trace.bootstrap.debugger.CapturedContext;
import datadog.trace.bootstrap.debugger.CapturedContextProbe;
import datadog.trace.bootstrap.debugger.EvaluationError;
Expand All @@ -30,7 +31,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpanDecorationProbe extends ProbeDefinition implements CapturedContextProbe {
public class SpanDecorationProbe extends ProbeDefinition implements CapturedContextProbe, Sampled {
private static final Logger LOGGER = LoggerFactory.getLogger(SpanDecorationProbe.class);
private static final String PROBEID_DD_TAGS_FORMAT = "_dd.di.%s.probe_id";
private static final String EVALERROR_DD_TAGS_FORMAT = "_dd.di.%s.evaluation_error";
Expand Down Expand Up @@ -157,6 +158,7 @@ public int hashCode() {

private final TargetSpan targetSpan;
private final List<Decoration> decorations;
private transient Sampler errorSampler;

// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
// constructors, including field initializers.
Expand Down Expand Up @@ -295,7 +297,7 @@ private void handleEvaluationErrors(SpanDecorationStatus status) {
if (status.getErrors().isEmpty()) {
return;
}
boolean sampled = ProbeRateLimiter.tryProbe(id);
boolean sampled = ProbeRateLimiter.tryProbe(errorSampler, true);
if (!sampled) {
return;
}
Expand All @@ -317,6 +319,11 @@ public List<Decoration> getDecorations() {
return decorations;
}

@Override
public void initSamplers() {
errorSampler = ProbeRateLimiter.createSampler(1.0);
}

@Generated
@Override
public int hashCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datadog.debugger.instrumentation.DiagnosticMessage;
import com.datadog.debugger.instrumentation.InstrumentationResult;
import com.datadog.debugger.instrumentation.MethodInfo;
import datadog.trace.api.sampling.Sampler;
import datadog.trace.bootstrap.debugger.CapturedContext;
import datadog.trace.bootstrap.debugger.CapturedContextProbe;
import datadog.trace.bootstrap.debugger.MethodLocation;
Expand All @@ -29,6 +30,7 @@ public class TriggerProbe extends ProbeDefinition implements Sampled, CapturedCo
private ProbeCondition probeCondition;
private Sampling sampling;
private String sessionId;
private transient Sampler sampler;

// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
// constructors, including field initializers.
Expand Down Expand Up @@ -68,6 +70,12 @@ public Sampling getSampling() {
return sampling;
}

@Override
public void initSamplers() {
double rate = sampling != null ? sampling.getEventsPerSecond() : 1.0;
sampler = ProbeRateLimiter.createSampler(rate);
}

@Override
public boolean isCaptureSnapshot() {
return false;
Expand Down Expand Up @@ -104,7 +112,8 @@ public void evaluate(
if (sampling == null || !sampling.inCoolDown()) {
boolean sample = true;
if (!hasCondition()) {
sample = MethodLocation.isSame(location, evaluateAt) && ProbeRateLimiter.tryProbe(id);
sample =
MethodLocation.isSame(location, evaluateAt) && ProbeRateLimiter.tryProbe(sampler, true);
}
boolean value = evaluateCondition(context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.datadog.debugger.instrumentation.InstrumentationResult;
import com.datadog.debugger.probe.LogProbe;
import com.datadog.debugger.probe.ProbeDefinition;
import com.datadog.debugger.probe.Sampled;
import com.datadog.debugger.sink.DebuggerSink;
import com.datadog.debugger.sink.ProbeStatusSink;
import com.datadog.debugger.util.MoshiHelper;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void after() {
if (currentTransformer != null) {
instr.removeTransformer(currentTransformer);
}
ProbeRateLimiter.resetAll();
ProbeRateLimiter.resetGlobalRate();
Assertions.assertFalse(DebuggerContext.isInProbe());
Redaction.clearUserDefinedTypes();
}
Expand Down Expand Up @@ -354,10 +355,9 @@ protected TestSnapshotListener installProbes(Configuration configuration) {
DebuggerContext.initClassFilter(new DenyListHelper(null));
DebuggerContext.initValueSerializer(new JsonSnapshotSerializer());

for (LogProbe probe : configuration.getLogProbes()) {
if (probe.getSampling() != null) {
ProbeRateLimiter.setRate(
probe.getId(), probe.getSampling().getEventsPerSecond(), probe.isCaptureSnapshot());
for (ProbeDefinition probe : configuration.getDefinitions()) {
if (probe instanceof Sampled) {
((Sampled) probe).initSamplers();
}
}
if (configuration.getSampling() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.datadog.debugger.el.DSL;
import com.datadog.debugger.el.ProbeCondition;
import com.datadog.debugger.probe.LogProbe;
import com.datadog.debugger.probe.ProbeDefinition;
import com.datadog.debugger.probe.Sampled;
import com.datadog.debugger.sink.ProbeStatusSink;
import com.datadog.debugger.sink.Snapshot;
import com.datadog.debugger.util.TestSnapshotListener;
Expand Down Expand Up @@ -55,7 +57,7 @@ public void after() {
if (currentTransformer != null) {
instr.removeTransformer(currentTransformer);
}
ProbeRateLimiter.resetAll();
ProbeRateLimiter.resetGlobalRate();
}

@Test
Expand Down Expand Up @@ -564,6 +566,11 @@ private TestSnapshotListener installProbes(Configuration configuration) {
.thenReturn("http://localhost:8126/debugger/v1/input");
when(config.getFinalDebuggerSymDBUrl()).thenReturn("http://localhost:8126/symdb/v1/input");
when(config.getDynamicInstrumentationUploadBatchSize()).thenReturn(100);
for (ProbeDefinition probe : configuration.getDefinitions()) {
if (probe instanceof Sampled) {
((Sampled) probe).initSamplers();
}
}
ProbeMetadata probeMetadata = new ProbeMetadata();
currentTransformer = new DebuggerTransformer(config, probeMetadata, configuration);
instr.addTransformer(currentTransformer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.datadog.debugger.el.values.StringValue;
import com.datadog.debugger.probe.LogProbe;
import com.datadog.debugger.probe.ProbeDefinition;
import com.datadog.debugger.probe.Sampled;
import com.datadog.debugger.probe.SpanDecorationProbe;
import com.datadog.debugger.sink.DebuggerSink;
import com.datadog.debugger.sink.ProbeStatusSink;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void setUp() {
public void after() {
super.after();
Redaction.clearUserDefinedTypes();
ProbeRateLimiter.resetAll();
ProbeRateLimiter.resetGlobalRate();
}

@Test
Expand Down Expand Up @@ -758,6 +759,11 @@ private void installSpanDecorationProbes(String expectedClassName, Configuration
.thenReturn("http://localhost:8126/debugger/v1/input");
when(config.getFinalDebuggerSymDBUrl()).thenReturn("http://localhost:8126/symdb/v1/input");
probeStatusSink = mock(ProbeStatusSink.class);
for (ProbeDefinition probe : configuration.getDefinitions()) {
if (probe instanceof Sampled) {
((Sampled) probe).initSamplers();
}
}
ProbeMetadata probeMetadata = new ProbeMetadata();
currentTransformer =
new DebuggerTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void after() {
instr.removeTransformer(currentTransformer);
}
ProbeRateLimiter.setSamplerSupplier(null);
ProbeRateLimiter.resetAll();
ProbeRateLimiter.resetGlobalRate();
}

@Test
Expand Down
Loading
Loading