[SPARK-55471] Add optimizer support for SequentialStreamingUnion #54236
+209
−22
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR adds optimizer support for
SequentialStreamingUnionto enable the same query optimizations that regularUnionnodes receive. Specifically:Validation refactoring: Moved nesting validation from analysis phase to post-optimization phase
validateNoNestingfromValidateSequentialStreamingUnion(analysis rule)ValidateSequentialStreamingUnionNestingrule that runs after the optimizer's "Union" batchOptimizer enhancements:
PushProjectionThroughUnionto supportSequentialStreamingUnionby:pushProjectionThroughUnionLike()method that works with anyUnionBasesubtypeSequentialStreamingUnionto push projections down to childrenCombineUnionsto supportSequentialStreamingUnionby:flattenSequentialStreamingUnion()method (mirrorsflattenUnion())Distinct,Deduplicate, andDeduplicateWithinWatermarkWhy are the changes needed?
SequentialStreamingUnionis a specialized union operator for streaming queries that maintains source ordering and has specific constraints (no stateful operations, streaming sources only). However, it was missing optimizer support that regularUnionnodes have.Without these optimizations:
SequentialStreamingUnionnodes created through multiplefollowedBy()calls would be rejected at analysis time, even though they could be flattenedSequentialStreamingUnionwouldn't be pushed down, leading to unnecessary data processingThis PR enables better query optimization for streaming workloads using
SequentialStreamingUnion.Does this PR introduce any user-facing change?
No. The changes are internal optimizer improvements that maintain the same semantics. Users may see:
The validation change is not user-facing because the nesting validation still rejects nested unions, just at a later phase (after optimization attempts flattening).
How was this patch tested?
Updated existing tests: Modified
SequentialStreamingUnionAnalysisSuiteto reflect that nesting validation now runs post-optimization rather than during analysisAdded new optimizer tests in
SetOperationSuite:SequentialStreamingUnion: combine nested unions into one- verifies flattening of nested unionsSequentialStreamingUnion: flatten through Distinct- verifies flattening through Distinct operatorSequentialStreamingUnion: flatten through Deduplicate- verifies flattening through Deduplicate operatorSequentialStreamingUnion: project to each side- verifies projection pushdown to all childrenSequentialStreamingUnion: expressions in project list are pushed down- verifies expression pushdownAll tests pass with the new optimizer rules in place.
Was this patch authored or co-authored using generative AI tooling?
No.