diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index b51b8c787a6..3adc61c3d73 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -128,13 +128,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { protected final HashMap sqlPlans = new HashMap<>(); private final HashMap liveExecutors = new HashMap<>(); - // There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of - // an active SQL query) so capping the size of the collection storing them - // TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently? - // If we know we don't need the accumulator values, can we drop all associated data and just map - // stage ID -> accumulator ID? Put this behind some FF - private final Map accumulators = - new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); + private final Map acc2stage = new HashMap<>(); private volatile boolean isStreamingJob = false; private final boolean isRunningOnDatabricks; @@ -648,7 +642,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl for (AccumulableInfo info : JavaConverters.asJavaCollection(stageInfo.accumulables().values())) { - accumulators.put(info.id(), new SparkSQLUtils.AccumulatorWithStage(stageId, info)); + acc2stage.put(info.id(), stageId); } Properties prop = stageProperties.remove(stageSpanKey); @@ -680,7 +674,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId); if (sqlPlan != null) { - SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId); + SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, acc2stage, stageMetric, stageId); } span.finish(completionTimeMs * 1000); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java index 7eff461a00a..42be9cf0233 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java @@ -23,7 +23,7 @@ public class SparkSQLUtils { public static void addSQLPlanToStageSpan( AgentSpan span, SparkPlanInfo plan, - Map accumulators, + Map accumulators, SparkAggregatedTaskMetrics stageMetric, int stageId) { Set parentStageIds = new HashSet<>(); @@ -40,7 +40,7 @@ public static void addSQLPlanToStageSpan( public static SparkPlanInfoForStage computeStageInfoForStage( SparkPlanInfo plan, - Map accumulators, + Map accumulators, int stageId, Set parentStageIds, boolean foundStage) { @@ -88,18 +88,17 @@ public static SparkPlanInfoForStage computeStageInfoForStage( return null; } - private static Set stageIdsForPlan( - SparkPlanInfo info, Map accumulators) { + private static Set stageIdsForPlan(SparkPlanInfo info, Map accumulators) { Set stageIds = new HashSet<>(); Collection metrics = AbstractDatadogSparkListener.listener.getPlanInfoMetrics(info); for (SQLMetricInfo metric : metrics) { // Using the accumulators to associate a plan with its stage - AccumulatorWithStage acc = accumulators.get(metric.accumulatorId()); + Integer stageId = accumulators.get(metric.accumulatorId()); - if (acc != null) { - stageIds.add(acc.stageId); + if (stageId != null) { + stageIds.add(stageId); } }