Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.spark.ExceptionFailure;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskFailedReason;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.scheduler.AccumulableInfo;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListener;
Expand All @@ -64,6 +65,7 @@
import org.apache.spark.sql.streaming.StateOperatorProgress;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
import org.apache.spark.util.AccumulatorV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
Expand Down Expand Up @@ -127,8 +129,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();

// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
// an active SQL query)
// so capping the size of the collection storing them
// an active SQL query) so capping the size of the collection storing them
// TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently?
// If we know we don't need the accumulator values, can we drop all associated data and just map
// stage ID -> accumulator ID? Put this behind some FF
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);

Expand All @@ -151,6 +155,8 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sparkVersion) {
tracer = AgentTracer.get();

log.error("[CHARLES] HELLO WORLD");

this.sparkConf = sparkConf;
this.appId = appId;
this.sparkVersion = sparkVersion;
Expand Down Expand Up @@ -229,6 +235,12 @@ public void setupOpenLineage(DDTraceId traceId) {
/** Parent Ids of a Stage. Provide an implementation based on a specific scala version */
protected abstract int[] getStageParentIds(StageInfo info);

/**
* All External Accumulators associated with a given task. Provide an implementation based on a
* specific scala version
*/
protected abstract List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics);

@Override
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
this.applicationStart = applicationStart;
Expand Down Expand Up @@ -670,7 +682,8 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl

SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId);
if (sqlPlan != null) {
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageId);
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId);
log.info("[CHARLES]", span.getTag("_dd.spark.sql_plan"));
}

span.finish(completionTimeMs * 1000);
Expand All @@ -684,7 +697,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {

SparkAggregatedTaskMetrics stageMetric = stageMetrics.get(stageSpanKey);
if (stageMetric != null) {
stageMetric.addTaskMetrics(taskEnd);
// Not happy that we have to extract external accumulators here, but needed as we're dealing
// with Seq which varies across Scala versions
stageMetric.addTaskMetrics(taskEnd, getExternalAccumulators(taskEnd.taskMetrics()));
}

if (taskEnd.taskMetrics() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package datadog.trace.instrumentation.spark;

import com.fasterxml.jackson.core.JsonGenerator;
import datadog.metrics.api.Histogram;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.TaskFailedReason;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import org.apache.spark.util.AccumulatorV2;

class SparkAggregatedTaskMetrics {
private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0;
Expand Down Expand Up @@ -59,13 +66,17 @@ class SparkAggregatedTaskMetrics {
private Histogram shuffleWriteBytesHistogram;
private Histogram diskBytesSpilledHistogram;

// Used for Spark SQL Plan metrics ONLY, don't put in regular span for now
private Map<Long, Histogram> externalAccumulableHistograms;

public SparkAggregatedTaskMetrics() {}

public SparkAggregatedTaskMetrics(long availableExecutorTime) {
this.previousAvailableExecutorTime = availableExecutorTime;
}

public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
public void addTaskMetrics(
SparkListenerTaskEnd taskEnd, List<AccumulatorV2> externalAccumulators) {
taskCompletedCount += 1;

if (taskEnd.taskInfo().attemptNumber() > 0) {
Expand Down Expand Up @@ -127,6 +138,31 @@ public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
shuffleWriteBytesHistogram, taskMetrics.shuffleWriteMetrics().bytesWritten());
diskBytesSpilledHistogram =
lazyHistogramAccept(diskBytesSpilledHistogram, taskMetrics.diskBytesSpilled());

// TODO (CY): Should we also look at TaskInfo accumulable update values as a backup? Is that
// only needed for SHS?
if (externalAccumulators != null && !externalAccumulators.isEmpty()) {
if (externalAccumulableHistograms == null) {
externalAccumulableHistograms = new HashMap<>(externalAccumulators.size());
}

externalAccumulators.forEach(
acc -> {
Histogram hist = externalAccumulableHistograms.get(acc.id());
if (hist == null) {
hist =
Histogram.newHistogram(HISTOGRAM_RELATIVE_ACCURACY, HISTOGRAM_MAX_NUM_BINS);
}

try {
// As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new
// versions
hist.accept((Long) acc.value());
externalAccumulableHistograms.put(acc.id(), hist);
} catch (ClassCastException ignored) {
}
});
}
}
}
}
Expand Down Expand Up @@ -276,6 +312,21 @@ private Histogram lazyHistogramAccept(Histogram hist, double value) {
return hist;
}

// Used to put external accum metrics to JSON for Spark SQL plans
public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) throws IOException {
if (externalAccumulableHistograms != null) {
Histogram hist = externalAccumulableHistograms.get(info.accumulatorId());
String name = info.name();

if (name != null && hist != null) {
generator.writeStartObject();
generator.writeStringField(name, histogramToBase64(hist));
generator.writeStringField("type", info.metricType());
generator.writeEndObject();
}
}
}

public static long computeTaskRunTime(TaskMetrics metrics) {
return metrics.executorDeserializeTime()
+ metrics.executorRunTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public static void addSQLPlanToStageSpan(
AgentSpan span,
SparkPlanInfo plan,
Map<Long, AccumulatorWithStage> accumulators,
SparkAggregatedTaskMetrics stageMetric,
int stageId) {
Set<Integer> parentStageIds = new HashSet<>();
SparkPlanInfoForStage planForStage =
Expand All @@ -32,7 +33,7 @@ public static void addSQLPlanToStageSpan(
span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString());

if (planForStage != null) {
String json = planForStage.toJson(accumulators);
String json = planForStage.toJson(stageMetric);
span.setTag("_dd.spark.sql_plan", json);
}
}
Expand Down Expand Up @@ -143,15 +144,15 @@ public SparkPlanInfoForStage(SparkPlanInfo plan, List<SparkPlanInfoForStage> chi
this.children = children;
}

public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
public String toJson(SparkAggregatedTaskMetrics stageMetric) {
// Using the jackson JSON lib used by spark
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
ObjectMapper mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) {
this.toJson(generator, accumulators, mapper);
this.toJson(generator, mapper, stageMetric);
} catch (IOException e) {
return null;
}
Expand All @@ -160,7 +161,7 @@ public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
}

private void toJson(
JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators, ObjectMapper mapper)
JsonGenerator generator, ObjectMapper mapper, SparkAggregatedTaskMetrics stageMetric)
throws IOException {
generator.writeStartObject();
generator.writeStringField("node", plan.nodeName());
Expand Down Expand Up @@ -199,11 +200,7 @@ private void toJson(
generator.writeFieldName("metrics");
generator.writeStartArray();
for (SQLMetricInfo metric : metrics) {
long accumulatorId = metric.accumulatorId();
AccumulatorWithStage acc = accumulators.get(accumulatorId);
if (acc != null) {
acc.toJson(generator, metric);
}
stageMetric.externalAccumToJson(generator, metric);
}
generator.writeEndArray();
}
Expand All @@ -213,7 +210,7 @@ private void toJson(
generator.writeFieldName("children");
generator.writeStartArray();
for (SparkPlanInfoForStage child : children) {
child.toJson(generator, accumulators, mapper);
child.toJson(generator, mapper, stageMetric);
}
generator.writeEndArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {

// Each metric is a dict { "metric_name": "metric_value", "type": "metric_type" }
expectedMetric.each { key, expectedValue ->
assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric"
assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric. actual: $actual.metrics, expected: $expected.metrics"

// Some metric values are duration that will varies between runs
// In the case, setting the expected value to "any" skips the assertion
def actualValue = actualMetric[key]
assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue"
assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue. actual: $actual.metrics, expected: $expected.metrics"
}
}
}
Expand Down Expand Up @@ -299,6 +299,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
{
"peak memory total (min, med, max)": "any",
"type": "size"
},
{
"spill size total (min, med, max)": "any",
"type": "size"
}
],
"children": [
Expand All @@ -317,7 +321,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
},
"metrics": [
{
"number of output rows": 3,
"number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==",
"type": "sum"
}
]
Expand Down Expand Up @@ -367,12 +371,16 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
"type": "average"
},
{
"number of output rows": 2,
"number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoAIQAAAAAAAPA/",
"type": "sum"
},
{
"peak memory total (min, med, max)": "any",
"type": "size"
},
{
"spill size total (min, med, max)": "any",
"type": "size"
}
],
"children": [
Expand Down Expand Up @@ -572,6 +580,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
{
"number of output rows": "any",
"type": "sum"
},
{
"spill size total (min, med, max)": "any",
"type": "size"
}
],
"children": [
Expand Down Expand Up @@ -731,7 +743,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
},
"metrics": [
{
"number of output rows": 1,
"number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA",
"type": "sum"
}
],
Expand Down
Loading