Skip to content

Conversation

@ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Feb 9, 2026

What changes were proposed in this pull request?

This PR adds optimizer support for SequentialStreamingUnion to enable the same query optimizations that regular Union nodes receive. Specifically:

  1. Validation refactoring: Moved nesting validation from analysis phase to post-optimization phase

    • Removed validateNoNesting from ValidateSequentialStreamingUnion (analysis rule)
    • Added new ValidateSequentialStreamingUnionNesting rule that runs after the optimizer's "Union" batch
    • This allows the optimizer to flatten nested unions before validating them
  2. Optimizer enhancements:

    • Extended PushProjectionThroughUnion to support SequentialStreamingUnion by:
      • Creating a generic pushProjectionThroughUnionLike() method that works with any UnionBase subtype
      • Adding pattern matching for SequentialStreamingUnion to push projections down to children
    • Extended CombineUnions to support SequentialStreamingUnion by:
      • Adding flattenSequentialStreamingUnion() method (mirrors flattenUnion())
      • Supporting flattening through Distinct, Deduplicate, and DeduplicateWithinWatermark
      • Enabling nested union flattening and projection pushdown through wrapping operators

Why are the changes needed?

SequentialStreamingUnion is a specialized union operator for streaming queries that maintains source ordering and has specific constraints (no stateful operations, streaming sources only). However, it was missing optimizer support that regular Union nodes have.

Without these optimizations:

  • Nested SequentialStreamingUnion nodes created through multiple followedBy() calls would be rejected at analysis time, even though they could be flattened
  • Projections above SequentialStreamingUnion wouldn't be pushed down, leading to unnecessary data processing
  • Users couldn't compose complex streaming queries with the same flexibility as batch queries

This PR enables better query optimization for streaming workloads using SequentialStreamingUnion.

Does this PR introduce any user-facing change?

No. The changes are internal optimizer improvements that maintain the same semantics. Users may see:

  • Better performance due to projection pushdown and union flattening
  • More flexible query patterns (e.g., projections over nested followedBy calls) that now optimize correctly

The validation change is not user-facing because the nesting validation still rejects nested unions, just at a later phase (after optimization attempts flattening).

How was this patch tested?

  1. Updated existing tests: Modified SequentialStreamingUnionAnalysisSuite to reflect that nesting validation now runs post-optimization rather than during analysis

  2. Added new optimizer tests in SetOperationSuite:

    • SequentialStreamingUnion: combine nested unions into one - verifies flattening of nested unions
    • SequentialStreamingUnion: flatten through Distinct - verifies flattening through Distinct operator
    • SequentialStreamingUnion: flatten through Deduplicate - verifies flattening through Deduplicate operator
    • SequentialStreamingUnion: project to each side - verifies projection pushdown to all children
    • SequentialStreamingUnion: expressions in project list are pushed down - verifies expression pushdown

All tests pass with the new optimizer rules in place.

Was this patch authored or co-authored using generative AI tooling?

No.

This commit adds comprehensive optimizer support for SequentialStreamingUnion:

1. CombineUnions rule now flattens nested SequentialStreamingUnion nodes
   - Created generic flattenUnionLike method to handle both Union and
     SequentialStreamingUnion without code duplication
   - Supports flattening through Distinct and Deduplicate operators
   - Handles projection pushdown and nested projection collapsing

2. PushProjectionThroughUnion rule now supports SequentialStreamingUnion
   - Projects deterministic expressions down to each sequential source
   - Reuses generic pushProjectionThroughUnionLike helper method

3. Split validation into analyzer and post-optimizer phases
   - Analyzer validates: streaming sources, no stateful operations
   - Post-optimizer validates: no nesting (after CombineUnions flattens)
   - Allows optimizer to flatten complex patterns like
     df1.followedBy(df2.followedBy(df3).select(...))

4. Added comprehensive test coverage in SetOperationSuite
   - Tests for nested union flattening
   - Tests for flattening through Distinct and Deduplicate
   - Tests for projection pushdown

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
@dongjoon-hyun dongjoon-hyun marked this pull request as draft February 10, 2026 06:42
@ericm-db ericm-db changed the title [WIP] Add optimizer support for SequentialStreamingUnion [SPARK-55471] Add optimizer support for SequentialStreamingUnion Feb 10, 2026
This PR adds optimizer support for `SequentialStreamingUnion` to enable the same query optimizations that regular `Union` nodes receive. Specifically:

1. **Validation refactoring**: Moved nesting validation from analysis phase to post-optimization phase
   - Removed `validateNoNesting` from `ValidateSequentialStreamingUnion` (analysis rule)
   - Added new `ValidateSequentialStreamingUnionNesting` rule that runs after the optimizer's "Union" batch
   - This allows the optimizer to flatten nested unions before validating them

2. **Optimizer enhancements**:
   - Extended `PushProjectionThroughUnion` to support `SequentialStreamingUnion` by:
     - Creating a generic `pushProjectionThroughUnionLike()` method that works with any `UnionBase` subtype
     - Adding pattern matching for `SequentialStreamingUnion` to push projections down to children
   - Extended `CombineUnions` to support `SequentialStreamingUnion` by:
     - Adding `flattenSequentialStreamingUnion()` method (mirrors `flattenUnion()`)
     - Supporting flattening through `Distinct`, `Deduplicate`, and `DeduplicateWithinWatermark`
     - Enabling nested union flattening and projection pushdown through wrapping operators

### Why are the changes needed?

`SequentialStreamingUnion` is a specialized union operator for streaming queries (added in SPARK-49339) that maintains source ordering and has specific constraints (no stateful operations, streaming sources only). However, it was missing optimizer support that regular `Union` nodes have.

Without these optimizations:
- Nested `SequentialStreamingUnion` nodes created through multiple `followedBy()` calls would be rejected at analysis time, even though they could be flattened
- Projections above `SequentialStreamingUnion` wouldn't be pushed down, leading to unnecessary data processing
- Users couldn't compose complex streaming queries with the same flexibility as batch queries

This PR enables better query optimization for streaming workloads using `SequentialStreamingUnion`.

### Does this PR introduce _any_ user-facing change?

No. The changes are internal optimizer improvements that maintain the same semantics. Users may see:
- Better performance due to projection pushdown and union flattening
- More flexible query patterns (e.g., projections over nested followedBy calls) that now optimize correctly

The validation change is not user-facing because the nesting validation still rejects nested unions, just at a later phase (after optimization attempts flattening).

### How was this patch tested?

1. **Updated existing tests**: Modified `SequentialStreamingUnionAnalysisSuite` to reflect that nesting validation now runs post-optimization rather than during analysis

2. **Added new optimizer tests** in `SetOperationSuite`:
   - `SequentialStreamingUnion: combine nested unions into one` - verifies flattening of nested unions
   - `SequentialStreamingUnion: flatten through Distinct` - verifies flattening through Distinct operator
   - `SequentialStreamingUnion: flatten through Deduplicate` - verifies flattening through Deduplicate operator
   - `SequentialStreamingUnion: project to each side` - verifies projection pushdown to all children
   - `SequentialStreamingUnion: expressions in project list are pushed down` - verifies expression pushdown

All tests pass with the new optimizer rules in place.

### Was this patch authored or co-authored using generative AI tooling?

No.
@ericm-db ericm-db force-pushed the sequential-union-optimizer-support branch from b7ef60b to f65e410 Compare February 10, 2026 18:40
@ericm-db ericm-db marked this pull request as ready for review February 10, 2026 18:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant