diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 5b4e1501826..5329fada70e 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -40,6 +40,7 @@ import org.apache.spark.ExceptionFailure; import org.apache.spark.SparkConf; import org.apache.spark.TaskFailedReason; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.AccumulableInfo; import org.apache.spark.scheduler.JobFailed; import org.apache.spark.scheduler.SparkListener; @@ -64,6 +65,7 @@ import org.apache.spark.sql.streaming.StateOperatorProgress; import org.apache.spark.sql.streaming.StreamingQueryListener; import org.apache.spark.sql.streaming.StreamingQueryProgress; +import org.apache.spark.util.AccumulatorV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; @@ -127,8 +129,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final HashMap liveExecutors = new HashMap<>(); // There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of - // an active SQL query) - // so capping the size of the collection storing them + // an active SQL query) so capping the size of the collection storing them + // TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently? + // If we know we don't need the accumulator values, can we drop all associated data and just map + // stage ID -> accumulator ID? Put this behind some FF private final Map accumulators = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); @@ -151,6 +155,8 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sparkVersion) { tracer = AgentTracer.get(); + log.error("[CHARLES] HELLO WORLD"); + this.sparkConf = sparkConf; this.appId = appId; this.sparkVersion = sparkVersion; @@ -229,6 +235,12 @@ public void setupOpenLineage(DDTraceId traceId) { /** Parent Ids of a Stage. Provide an implementation based on a specific scala version */ protected abstract int[] getStageParentIds(StageInfo info); + /** + * All External Accumulators associated with a given task. Provide an implementation based on a + * specific scala version + */ + protected abstract List getExternalAccumulators(TaskMetrics metrics); + @Override public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; @@ -670,7 +682,8 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId); if (sqlPlan != null) { - SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageId); + SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId); + log.info("[CHARLES]", span.getTag("_dd.spark.sql_plan")); } span.finish(completionTimeMs * 1000); @@ -684,7 +697,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { SparkAggregatedTaskMetrics stageMetric = stageMetrics.get(stageSpanKey); if (stageMetric != null) { - stageMetric.addTaskMetrics(taskEnd); + // Not happy that we have to extract external accumulators here, but needed as we're dealing + // with Seq which varies across Scala versions + stageMetric.addTaskMetrics(taskEnd, getExternalAccumulators(taskEnd.taskMetrics())); } if (taskEnd.taskMetrics() != null) { diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java index 757f20f75f5..5b0167a1da1 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java @@ -1,13 +1,20 @@ package datadog.trace.instrumentation.spark; +import com.fasterxml.jackson.core.JsonGenerator; import datadog.metrics.api.Histogram; import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.spark.TaskFailedReason; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; class SparkAggregatedTaskMetrics { private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0; @@ -59,13 +66,17 @@ class SparkAggregatedTaskMetrics { private Histogram shuffleWriteBytesHistogram; private Histogram diskBytesSpilledHistogram; + // Used for Spark SQL Plan metrics ONLY, don't put in regular span for now + private Map externalAccumulableHistograms; + public SparkAggregatedTaskMetrics() {} public SparkAggregatedTaskMetrics(long availableExecutorTime) { this.previousAvailableExecutorTime = availableExecutorTime; } - public void addTaskMetrics(SparkListenerTaskEnd taskEnd) { + public void addTaskMetrics( + SparkListenerTaskEnd taskEnd, List externalAccumulators) { taskCompletedCount += 1; if (taskEnd.taskInfo().attemptNumber() > 0) { @@ -127,6 +138,31 @@ public void addTaskMetrics(SparkListenerTaskEnd taskEnd) { shuffleWriteBytesHistogram, taskMetrics.shuffleWriteMetrics().bytesWritten()); diskBytesSpilledHistogram = lazyHistogramAccept(diskBytesSpilledHistogram, taskMetrics.diskBytesSpilled()); + + // TODO (CY): Should we also look at TaskInfo accumulable update values as a backup? Is that + // only needed for SHS? + if (externalAccumulators != null && !externalAccumulators.isEmpty()) { + if (externalAccumulableHistograms == null) { + externalAccumulableHistograms = new HashMap<>(externalAccumulators.size()); + } + + externalAccumulators.forEach( + acc -> { + Histogram hist = externalAccumulableHistograms.get(acc.id()); + if (hist == null) { + hist = + Histogram.newHistogram(HISTOGRAM_RELATIVE_ACCURACY, HISTOGRAM_MAX_NUM_BINS); + } + + try { + // As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new + // versions + hist.accept((Long) acc.value()); + externalAccumulableHistograms.put(acc.id(), hist); + } catch (ClassCastException ignored) { + } + }); + } } } } @@ -276,6 +312,21 @@ private Histogram lazyHistogramAccept(Histogram hist, double value) { return hist; } + // Used to put external accum metrics to JSON for Spark SQL plans + public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) throws IOException { + if (externalAccumulableHistograms != null) { + Histogram hist = externalAccumulableHistograms.get(info.accumulatorId()); + String name = info.name(); + + if (name != null && hist != null) { + generator.writeStartObject(); + generator.writeStringField(name, histogramToBase64(hist)); + generator.writeStringField("type", info.metricType()); + generator.writeEndObject(); + } + } + } + public static long computeTaskRunTime(TaskMetrics metrics) { return metrics.executorDeserializeTime() + metrics.executorRunTime() diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java index 33718a4b0dc..7eff461a00a 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java @@ -24,6 +24,7 @@ public static void addSQLPlanToStageSpan( AgentSpan span, SparkPlanInfo plan, Map accumulators, + SparkAggregatedTaskMetrics stageMetric, int stageId) { Set parentStageIds = new HashSet<>(); SparkPlanInfoForStage planForStage = @@ -32,7 +33,7 @@ public static void addSQLPlanToStageSpan( span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString()); if (planForStage != null) { - String json = planForStage.toJson(accumulators); + String json = planForStage.toJson(stageMetric); span.setTag("_dd.spark.sql_plan", json); } } @@ -143,7 +144,7 @@ public SparkPlanInfoForStage(SparkPlanInfo plan, List chi this.children = children; } - public String toJson(Map accumulators) { + public String toJson(SparkAggregatedTaskMetrics stageMetric) { // Using the jackson JSON lib used by spark // https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0 ObjectMapper mapper = @@ -151,7 +152,7 @@ public String toJson(Map accumulators) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) { - this.toJson(generator, accumulators, mapper); + this.toJson(generator, mapper, stageMetric); } catch (IOException e) { return null; } @@ -160,7 +161,7 @@ public String toJson(Map accumulators) { } private void toJson( - JsonGenerator generator, Map accumulators, ObjectMapper mapper) + JsonGenerator generator, ObjectMapper mapper, SparkAggregatedTaskMetrics stageMetric) throws IOException { generator.writeStartObject(); generator.writeStringField("node", plan.nodeName()); @@ -199,11 +200,7 @@ private void toJson( generator.writeFieldName("metrics"); generator.writeStartArray(); for (SQLMetricInfo metric : metrics) { - long accumulatorId = metric.accumulatorId(); - AccumulatorWithStage acc = accumulators.get(accumulatorId); - if (acc != null) { - acc.toJson(generator, metric); - } + stageMetric.externalAccumToJson(generator, metric); } generator.writeEndArray(); } @@ -213,7 +210,7 @@ private void toJson( generator.writeFieldName("children"); generator.writeStartArray(); for (SparkPlanInfoForStage child : children) { - child.toJson(generator, accumulators, mapper); + child.toJson(generator, mapper, stageMetric); } generator.writeEndArray(); } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index a34941d1be0..a8e03e3533d 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -159,12 +159,12 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { // Each metric is a dict { "metric_name": "metric_value", "type": "metric_type" } expectedMetric.each { key, expectedValue -> - assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric" + assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric. actual: $actual.metrics, expected: $expected.metrics" // Some metric values are duration that will varies between runs // In the case, setting the expected value to "any" skips the assertion def actualValue = actualMetric[key] - assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue" + assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue. actual: $actual.metrics, expected: $expected.metrics" } } } @@ -299,6 +299,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -317,7 +321,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" } ] @@ -367,12 +371,16 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "type": "average" }, { - "number of output rows": 2, + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoAIQAAAAAAAPA/", "type": "sum" }, { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -572,6 +580,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { { "number of output rows": "any", "type": "sum" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -731,7 +743,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 1, + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", "type": "sum" } ], diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index a80eb6ab1cf..e28202be62c 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -56,7 +56,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 3, + "shuffle records written": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" }, { @@ -93,7 +93,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" }, { @@ -103,6 +103,10 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size", } ], "children": [ @@ -116,7 +120,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" } ] @@ -171,6 +175,10 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size", } ], "children": [ @@ -334,6 +342,10 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size", } ], "children": [ @@ -774,7 +786,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 1, + "shuffle records written": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", "type": "sum" }, { @@ -811,12 +823,16 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 1, + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", "type": "sum" }, { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size", } ], "children": [ @@ -1145,6 +1161,10 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size", } ], "children": [ diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index fdae211077e..fdd2a2fa076 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -1,14 +1,20 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.spark.SparkConf; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; +import scala.Function1; import scala.collection.JavaConverters; +import scala.collection.mutable.ArrayBuffer; /** * DatadogSparkListener compiled for Scala 2.12 @@ -17,6 +23,13 @@ * compiled with the specific scala version */ public class DatadogSpark212Listener extends AbstractDatadogSparkListener { + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle externalAccums = + methodLoader.method(TaskMetrics.class, "externalAccums"); + private static final MethodHandle withExternalAccums = + methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {}); + public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); } @@ -62,4 +75,27 @@ protected int[] getStageParentIds(StageInfo info) { return parentIds; } + + @Override + protected List getExternalAccumulators(TaskMetrics metrics) { + if (metrics == null) { + return null; + } + + Function1 lambda = + (Function1, List>) + accumulators -> JavaConverters.seqAsJavaList(accumulators); + List res = methodLoader.invoke(withExternalAccums, metrics, lambda); + if (res != null) { + return res; + } + + // withExternalAccums didn't work, try the legacy method + ArrayBuffer accumulators = methodLoader.invoke(externalAccums, metrics); + if (accumulators != null && !accumulators.isEmpty()) { + return JavaConverters.seqAsJavaList(accumulators); + } + + return null; + } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 115cdcbb9b0..8d1e3fa4747 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -1,13 +1,19 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.spark.SparkConf; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; +import scala.Function1; +import scala.collection.mutable.ArrayBuffer; import scala.jdk.javaapi.CollectionConverters; /** @@ -17,6 +23,13 @@ * compiled with the specific scala version */ public class DatadogSpark213Listener extends AbstractDatadogSparkListener { + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle externalAccums = + methodLoader.method(TaskMetrics.class, "externalAccums"); + private static final MethodHandle withExternalAccums = + methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {}); + public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); } @@ -62,4 +75,27 @@ protected int[] getStageParentIds(StageInfo info) { return parentIds; } + + @Override + protected List getExternalAccumulators(TaskMetrics metrics) { + if (metrics == null) { + return null; + } + + Function1 lambda = + (Function1, List>) + accumulators -> CollectionConverters.asJava(accumulators); + List res = methodLoader.invoke(withExternalAccums, metrics, lambda); + if (res != null) { + return res; + } + + // withExternalAccums didn't work, try the legacy method + ArrayBuffer accumulators = methodLoader.invoke(externalAccums, metrics); + if (accumulators != null && !accumulators.isEmpty()) { + return CollectionConverters.asJava(accumulators); + } + + return null; + } }