Skip to content

Comments

[SPARK-55619][SQL][3.5] Fix custom metrics in case of coalesced partitions#54409

Open
viirya wants to merge 1 commit intoapache:branch-3.5from
viirya:SPARK-55619-branch-3.5
Open

[SPARK-55619][SQL][3.5] Fix custom metrics in case of coalesced partitions#54409
viirya wants to merge 1 commit intoapache:branch-3.5from
viirya:SPARK-55619-branch-3.5

Conversation

@viirya
Copy link
Member

@viirya viirya commented Feb 20, 2026

What changes were proposed in this pull request?

Replace PartitionMetricCallback with a ConcurrentHashMap keyed by task attempt ID to correctly track reader state across multiple compute() calls when DataSourceRDD is coalesced. The completion listener is registered only once per task attempt, and metrics are flushed and carried forward between readers as partitions are advanced.

Why are the changes needed?

When DataSourceRDD is coalesced (e.g., via .coalesce(1)), compute() gets called multiple times per task, which causes the custom metrics incorrect.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

@viirya
Copy link
Member Author

viirya commented Feb 20, 2026

cc @peter-toth @dongjoon-hyun

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs)

@viirya viirya force-pushed the SPARK-55619-branch-3.5 branch from 7692b66 to ca6db66 Compare February 21, 2026 02:41
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you fix the compilation error?

KeyGroupedPartitioningSuite.scala:1250: not found: value itemsColumns

…tions

Replace PartitionMetricCallback with a ConcurrentHashMap keyed by task
attempt ID to correctly track reader state across multiple compute()
calls when DataSourceRDD is coalesced. The completion listener is
registered only once per task attempt, and metrics are flushed and
carried forward between readers as partitions are advanced.

Co-Authored-By: Peter Toth <peter.toth@gmail.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@viirya viirya force-pushed the SPARK-55619-branch-3.5 branch from ca6db66 to 2124044 Compare February 21, 2026 06:56
@viirya
Copy link
Member Author

viirya commented Feb 21, 2026

Could you fix the compilation error?

KeyGroupedPartitioningSuite.scala:1250: not found: value itemsColumns

I thought I fixed it but didn't commit the fix. Fixed it now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants