Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
protected final HashMap<Long, SparkPlanInfo> sqlPlans = new HashMap<>();
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();

// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
// an active SQL query) so capping the size of the collection storing them
// TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently?
// If we know we don't need the accumulator values, can we drop all associated data and just map
// stage ID -> accumulator ID? Put this behind some FF
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
private final Map<Long, Integer> acc2stage = new HashMap<>();

private volatile boolean isStreamingJob = false;
private final boolean isRunningOnDatabricks;
Expand Down Expand Up @@ -648,7 +642,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl

for (AccumulableInfo info :
JavaConverters.asJavaCollection(stageInfo.accumulables().values())) {
accumulators.put(info.id(), new SparkSQLUtils.AccumulatorWithStage(stageId, info));
acc2stage.put(info.id(), stageId);
}

Properties prop = stageProperties.remove(stageSpanKey);
Expand Down Expand Up @@ -680,7 +674,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl

SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId);
if (sqlPlan != null) {
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId);
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, acc2stage, stageMetric, stageId);
}

span.finish(completionTimeMs * 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class SparkSQLUtils {
public static void addSQLPlanToStageSpan(
AgentSpan span,
SparkPlanInfo plan,
Map<Long, AccumulatorWithStage> accumulators,
Map<Long, Integer> accumulators,
SparkAggregatedTaskMetrics stageMetric,
int stageId) {
Set<Integer> parentStageIds = new HashSet<>();
Expand All @@ -40,7 +40,7 @@ public static void addSQLPlanToStageSpan(

public static SparkPlanInfoForStage computeStageInfoForStage(
SparkPlanInfo plan,
Map<Long, AccumulatorWithStage> accumulators,
Map<Long, Integer> accumulators,
int stageId,
Set<Integer> parentStageIds,
boolean foundStage) {
Expand Down Expand Up @@ -88,18 +88,17 @@ public static SparkPlanInfoForStage computeStageInfoForStage(
return null;
}

private static Set<Integer> stageIdsForPlan(
SparkPlanInfo info, Map<Long, AccumulatorWithStage> accumulators) {
private static Set<Integer> stageIdsForPlan(SparkPlanInfo info, Map<Long, Integer> accumulators) {
Set<Integer> stageIds = new HashSet<>();

Collection<SQLMetricInfo> metrics =
AbstractDatadogSparkListener.listener.getPlanInfoMetrics(info);
for (SQLMetricInfo metric : metrics) {
// Using the accumulators to associate a plan with its stage
AccumulatorWithStage acc = accumulators.get(metric.accumulatorId());
Integer stageId = accumulators.get(metric.accumulatorId());

if (acc != null) {
stageIds.add(acc.stageId);
if (stageId != null) {
stageIds.add(stageId);
}
}

Expand Down