From b304ba5148155b146aa4aba9272fbbc4e135f8e9 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 17 Feb 2026 11:40:35 -0600 Subject: [PATCH 1/4] Fixed kafka produce extractor --- .../KafkaProducerInstrumentation.java | 2 +- .../test/groovy/KafkaClientTestBase.groovy | 149 +++++++++++++++--- 2 files changed, 130 insertions(+), 21 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index c889983e338..c1539109686 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -211,7 +211,7 @@ record = .trackTransaction( span, DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS, - record, + record.headers(), DSM_TRANSACTION_SOURCE_READER); return activateSpan(span); } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index bd9df3a4101..bda2dd2c88b 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1,4 +1,5 @@ import datadog.trace.api.datastreams.DataStreamsTags +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor import datadog.trace.instrumentation.kafka_common.ClusterIdHolder import static datadog.trace.agent.test.utils.TraceUtils.basicSpan @@ -1047,6 +1048,114 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { producer?.close() } + def "test producer DSM transaction tracking extracts transaction id from headers"() { + setup: + // Configure a DSM transaction extractor for KAFKA_PRODUCE_HEADERS + def extractorsByTypeField = TEST_DATA_STREAMS_MONITORING.getClass().getDeclaredField("extractorsByType") + extractorsByTypeField.setAccessible(true) + def oldExtractorsByType = extractorsByTypeField.get(TEST_DATA_STREAMS_MONITORING) + + def extractor = new DataStreamsTransactionExtractor() { + String getName() { + return "kafka-produce-test" + } + DataStreamsTransactionExtractor.Type getType() { + return DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS + } + String getValue() { + return "x-transaction-id" + } + } + def extractorsByType = new EnumMap<>(DataStreamsTransactionExtractor.Type) + extractorsByType.put(DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS, [extractor]) + extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, extractorsByType) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-transaction-id", "txn-123".getBytes(StandardCharsets.UTF_8))) + + when: + def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-dsm-transaction", headers) + producer.send(record).get() + + then: + TEST_WRITER.waitForTraces(1) + def producedSpan = TEST_WRITER[0][0] + producedSpan.getTag(Tags.DSM_TRANSACTION_ID) == "txn-123" + producedSpan.getTag(Tags.DSM_TRANSACTION_CHECKPOINT) == "kafka-produce-test" + + cleanup: + extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, oldExtractorsByType) + producer?.close() + } + + def "test consumer DSM transaction tracking extracts transaction id from headers"() { + setup: + // Configure a DSM transaction extractor for KAFKA_CONSUME_HEADERS + def extractorsByTypeField = TEST_DATA_STREAMS_MONITORING.getClass().getDeclaredField("extractorsByType") + extractorsByTypeField.setAccessible(true) + def oldExtractorsByType = extractorsByTypeField.get(TEST_DATA_STREAMS_MONITORING) + + def extractor = new DataStreamsTransactionExtractor() { + String getName() { + return "kafka-consume-test" + } + DataStreamsTransactionExtractor.Type getType() { + return DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS + } + String getValue() { + return "x-transaction-id" + } + } + def extractorsByType = new EnumMap<>(DataStreamsTransactionExtractor.Type) + extractorsByType.put(DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS, [extractor]) + extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, extractorsByType) + + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-transaction-id", "txn-456".getBytes(StandardCharsets.UTF_8))) + + when: + def record = new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, "test-dsm-consume-transaction", headers) + producer.send(record).get() + + then: + TEST_WRITER.waitForTraces(1) + def pollResult = KafkaTestUtils.getRecords(consumer) + def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() + recs.hasNext() + recs.next().value() == "test-dsm-consume-transaction" + + // The consume span is created by TracingIterator when iterating over records + // Find the consumer span with the DSM transaction tags + TEST_WRITER.waitForTraces(2) + def allTraces = TEST_WRITER.toArray() as List> + def consumerSpan = allTraces.collectMany { + it + }.find { + it.getTag(Tags.DSM_TRANSACTION_ID) == "txn-456" + } + consumerSpan != null + consumerSpan.getTag(Tags.DSM_TRANSACTION_ID) == "txn-456" + consumerSpan.getTag(Tags.DSM_TRANSACTION_CHECKPOINT) == "kafka-consume-test" + + cleanup: + extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, oldExtractorsByType) + consumer?.close() + producer?.close() + } + def containerProperties() { try { // Different class names for test and latestDepTest. @@ -1057,12 +1166,12 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } def producerSpan( - TraceAssert trace, - Map config, - DDSpan parentSpan = null, - boolean partitioned = true, - boolean tombstone = false, - String schema = null + TraceAssert trace, + Map config, + DDSpan parentSpan = null, + boolean partitioned = true, + boolean tombstone = false, + String schema = null ) { trace.span { serviceName service() @@ -1104,8 +1213,8 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } def queueSpan( - TraceAssert trace, - DDSpan parentSpan = null + TraceAssert trace, + DDSpan parentSpan = null ) { trace.span { serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() @@ -1128,12 +1237,12 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } def consumerSpan( - TraceAssert trace, - Map config, - DDSpan parentSpan = null, - Range offset = 0..0, - boolean tombstone = false, - boolean distributedRootSpan = !hasQueueSpan() + TraceAssert trace, + Map config, + DDSpan parentSpan = null, + Range offset = 0..0, + boolean tombstone = false, + boolean distributedRootSpan = !hasQueueSpan() ) { trace.span { serviceName service() @@ -1169,12 +1278,12 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } def pollSpan( - TraceAssert trace, - int recordCount = 1, - DDSpan parentSpan = null, - Range offset = 0..0, - boolean tombstone = false, - boolean distributedRootSpan = !hasQueueSpan() + TraceAssert trace, + int recordCount = 1, + DDSpan parentSpan = null, + Range offset = 0..0, + boolean tombstone = false, + boolean distributedRootSpan = !hasQueueSpan() ) { trace.span { serviceName Config.get().getServiceName() From b823f07cfb5fabcb1efb5a80c5f4b8f89b480f6a Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 17 Feb 2026 12:59:14 -0600 Subject: [PATCH 2/4] Fixed DSM test --- .../src/test/groovy/KafkaClientTestBase.groovy | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index bda2dd2c88b..2163d1a281c 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1050,6 +1050,12 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def "test producer DSM transaction tracking extracts transaction id from headers"() { setup: + if (!isDataStreamsEnabled()) { + return + } + + injectEnvConfig("DD_DATA_STREAMS_ENABLED", "true") + // Configure a DSM transaction extractor for KAFKA_PRODUCE_HEADERS def extractorsByTypeField = TEST_DATA_STREAMS_MONITORING.getClass().getDeclaredField("extractorsByType") extractorsByTypeField.setAccessible(true) @@ -1087,12 +1093,18 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { producedSpan.getTag(Tags.DSM_TRANSACTION_CHECKPOINT) == "kafka-produce-test" cleanup: - extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, oldExtractorsByType) + extractorsByTypeField?.set(TEST_DATA_STREAMS_MONITORING, oldExtractorsByType) producer?.close() } def "test consumer DSM transaction tracking extracts transaction id from headers"() { setup: + if (!isDataStreamsEnabled()) { + return + } + + injectEnvConfig("DD_DATA_STREAMS_ENABLED", "true") + // Configure a DSM transaction extractor for KAFKA_CONSUME_HEADERS def extractorsByTypeField = TEST_DATA_STREAMS_MONITORING.getClass().getDeclaredField("extractorsByType") extractorsByTypeField.setAccessible(true) @@ -1151,7 +1163,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { consumerSpan.getTag(Tags.DSM_TRANSACTION_CHECKPOINT) == "kafka-consume-test" cleanup: - extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, oldExtractorsByType) + extractorsByTypeField?.set(TEST_DATA_STREAMS_MONITORING, oldExtractorsByType) consumer?.close() producer?.close() } From 74e942caa8b0e060ba9e31fb91f01509b1d559c3 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 17 Feb 2026 15:16:44 -0600 Subject: [PATCH 3/4] Added try/except for source readers --- .../instrumentation/decorator/HttpClientDecorator.java | 8 +++++++- .../instrumentation/decorator/HttpServerDecorator.java | 8 +++++++- .../datadog/trace/instrumentation/kafka_common/Utils.java | 8 +++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java index c57a7fc8e76..a081a97ddd0 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java @@ -72,7 +72,13 @@ protected boolean shouldSetResourceName() { private final DataStreamsTransactionTracker.TransactionSourceReader DSM_TRANSACTION_SOURCE_READER = - (source, headerName) -> getRequestHeader((REQUEST) source, headerName); + (source, headerName) -> { + try { + return getRequestHeader((REQUEST) source, headerName); + } catch (Exception e) { + return null; + } + }; public AgentSpan onRequest(final AgentSpan span, final REQUEST request) { if (request != null) { diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java index 97be766817e..723b1d603e1 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java @@ -185,7 +185,13 @@ protected AgentSpanContext startInferredProxySpan(Context context, AgentSpanCont private final DataStreamsTransactionTracker.TransactionSourceReader DSM_TRANSACTION_SOURCE_READER = - (source, headerName) -> getRequestHeader((REQUEST) source, headerName); + (source, headerName) -> { + try { + return getRequestHeader((REQUEST) source, headerName); + } catch (Exception e) { + return null; + } + }; public AgentSpan onRequest( final AgentSpan span, diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java index cb035ce2c50..8b463e0feac 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java @@ -11,7 +11,13 @@ private Utils() {} // prevent instantiation public static DataStreamsTransactionTracker.TransactionSourceReader DSM_TRANSACTION_SOURCE_READER = - (source, headerName) -> new String(((Headers) source).lastHeader(headerName).value()); + (source, headerName) -> { + try { + return new String(((Headers) source).lastHeader(headerName).value()); + } catch (Exception e) { + return null; + } + }; // this method is used in kafka-clients and kafka-streams instrumentations public static long computePayloadSizeBytes(ConsumerRecord val) { From 791045757ced4ea64dfba081c8c3ec95f6c29d84 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 17 Feb 2026 16:03:41 -0600 Subject: [PATCH 4/4] Use Throwable --- .../instrumentation/decorator/HttpClientDecorator.java | 2 +- .../instrumentation/decorator/HttpServerDecorator.java | 2 +- .../java/datadog/trace/instrumentation/kafka_common/Utils.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java index a081a97ddd0..7fa3fd26a2b 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java @@ -75,7 +75,7 @@ protected boolean shouldSetResourceName() { (source, headerName) -> { try { return getRequestHeader((REQUEST) source, headerName); - } catch (Exception e) { + } catch (Throwable ignored) { return null; } }; diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java index 723b1d603e1..1d80ab7ffc7 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java @@ -188,7 +188,7 @@ protected AgentSpanContext startInferredProxySpan(Context context, AgentSpanCont (source, headerName) -> { try { return getRequestHeader((REQUEST) source, headerName); - } catch (Exception e) { + } catch (Throwable ignored) { return null; } }; diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java index 8b463e0feac..d0f7eb4fcea 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java @@ -14,7 +14,7 @@ private Utils() {} // prevent instantiation (source, headerName) -> { try { return new String(((Headers) source).lastHeader(headerName).value()); - } catch (Exception e) { + } catch (Throwable ignored) { return null; } };