Skip to content

feat(cdk): Add RecordExpander component for nested array extraction#896

Open
devin-ai-integration[bot] wants to merge 5 commits intomainfrom
devin/1770323608-record-expander
Open

feat(cdk): Add RecordExpander component for nested array extraction#896
devin-ai-integration[bot] wants to merge 5 commits intomainfrom
devin/1770323608-record-expander

Conversation

@devin-ai-integration
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Feb 5, 2026

feat(cdk): Add RecordExpander component for nested array extraction

Summary

Adds a new RecordExpander declarative component that extracts items from nested array fields within records and emits each item as a separate record. This is an optional parameter on DpathExtractor.

New components:

  • RecordExpander — extracts items from a nested array path (supports wildcards like ["sections", "*", "items"]), with configurable on_no_records behavior (skip or emit_parent) and remain_original_record flag
  • ParentFieldMapping — maps fields from the parent record onto each expanded child record

Files changed (8):

  • expanders/__init__.py and expanders/record_expander.py — new module with core logic
  • declarative_component_schema.yaml — schema definitions for both components
  • models/declarative_component_schema.py — auto-generated Pydantic models
  • extractors/dpath_extractor.py — optional record_expander parameter integration
  • parsers/model_to_component_factory.py — factory methods + model mappings
  • parsers/manifest_component_transformer.py — transformer mapping for DpathExtractor.record_expander
  • test_dpath_extractor.py — 24 new parametrized test cases

This duplicates the changes from PR #859. It was reimplemented (not cherry-picked), so logic should be compared carefully against the original.

Updates since last revision

  • Renamed loop variable recordextracted_value in wildcard expansion path to avoid shadowing the method parameter
  • Added explanatory comment for empty except KeyError in ParentFieldMapping.copy_field
  • Moved RecordExpander/ParentFieldMapping imports to top-level in model_to_component_factory.py for mypy compliance
  • Note: poetry run poe build removed the copyright header from the auto-generated declarative_component_schema.py — this is a side effect of the code generation tool, not an intentional removal

Review & Testing Checklist for Human

  • Compare against original PR feat(cdk): Add RecordExpander component for nested array extraction #859 — This was reimplemented, not cherry-picked. Verify RecordExpander.expand_record() logic matches the original, especially wildcard handling via dpath.values() and the non-wildcard dpath.get() path. Subtle divergence is possible.
  • Verify original_record stores a reference, not a deep copy — When remain_original_record=True, child_record["original_record"] = parent_record stores a reference. If downstream consumers mutate the parent, all expanded records' original_record fields are affected. Confirm this is the intended behavior from feat(cdk): Add RecordExpander component for nested array extraction #859.
  • Check hasattr usage in factorycreate_dpath_extractor uses hasattr(model, "record_expander") as a defensive pattern. The Pydantic model should always have this field with a default of None, so hasattr may be unnecessary. Verify this aligns with factory conventions.
  • Copyright header removal on auto-generated filemodels/declarative_component_schema.py lost its Copyright (c) 2025 Airbyte, Inc. header after poetry run poe build. Verify whether the build tool should be configured to preserve it.
  • Verify test coverage matches feat(cdk): Add RecordExpander component for nested array extraction #859 — 24 parametrized test cases cover: basic expansion, remain_original_record, on_no_records behavior, parent_fields_to_copy, wildcard paths, edge cases (empty arrays, missing paths, non-array values, mixed types). Confirm no scenarios from the original PR were missed.

Suggested test plan: Configure a declarative connector with a DpathExtractor using record_expander against a real API that returns nested arrays (e.g., Stripe invoice line items) and verify records are expanded correctly with both on_no_records: skip and on_no_records: emit_parent.

Notes

Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

github-actions bot commented Feb 5, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1770323608-record-expander#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1770323608-record-expander

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
@sophiecuiy
Copy link

sophiecuiy commented Feb 5, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21728080762

Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
@github-actions
Copy link

github-actions bot commented Feb 5, 2026

PyTest Results (Fast)

3 884 tests  +29   3 872 ✅ +29   6m 49s ⏱️ +22s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 87808fd. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 5, 2026

PyTest Results (Full)

3 887 tests  +29   3 875 ✅ +29   11m 14s ⏱️ +13s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 87808fd. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new RecordExpander declarative component that extracts items from nested array fields within records and emits each item as a separate record. This is a reimplementation of PR #859 to support use cases like Stripe's invoice_line_items and subscription_items streams where API responses contain parent objects with nested arrays that need to be emitted as individual records.

Changes:

  • Introduces RecordExpander and ParentFieldMapping components with support for wildcard paths, configurable on_no_records behavior (skip/emit_parent), optional remain_original_record flag, and selective parent field copying
  • Integrates RecordExpander as an optional parameter in DpathExtractor for seamless record expansion in the extraction pipeline
  • Adds comprehensive test coverage with 24 parametrized test cases covering basic expansion, wildcards, edge cases, and feature combinations

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
airbyte_cdk/sources/declarative/expanders/__init__.py New module initialization exporting RecordExpander and ParentFieldMapping
airbyte_cdk/sources/declarative/expanders/record_expander.py Core implementation of record expansion logic with wildcard support and parent context preservation
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Schema definitions for RecordExpander, ParentFieldMapping, and OnNoRecords enum
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Auto-generated Pydantic models from YAML schema
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py Integration of optional record_expander parameter and expansion logic
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Factory methods for creating RecordExpander and ParentFieldMapping instances
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py Transformer mapping for DpathExtractor.record_expander
unit_tests/sources/declarative/extractors/test_dpath_extractor.py 24 new test cases covering expansion scenarios, wildcards, edge cases, and feature combinations

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

class ParentFieldMapping:
"""Defines a mapping from a parent record field to a child record field."""

source_field_path: list[str | InterpolatedString]
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ style (list[str | InterpolatedString]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List and typing.Union for type annotations. For consistency, this should be List[Union[str, InterpolatedString]] with appropriate imports from the typing module.

Copilot uses AI. Check for mistakes.
config: The user-provided configuration as specified by the source's spec.
"""

expand_records_from_field: list[str | InterpolatedString]
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ style (list[str | InterpolatedString]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List and typing.Union for type annotations. For consistency, this should be List[Union[str, InterpolatedString]] with appropriate imports from the typing module.

Copilot uses AI. Check for mistakes.
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False
on_no_records: str = "skip"
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ style (list[ParentFieldMapping]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List for type annotations. For consistency, this should be List[ParentFieldMapping] with appropriate import from the typing module.

Copilot uses AI. Check for mistakes.
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._expand_path: list[InterpolatedString] | None = [
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ union style (list[InterpolatedString] | None) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.Optional and typing.List. For consistency, this should be Optional[List[InterpolatedString]] with appropriate imports from the typing module.

Copilot uses AI. Check for mistakes.
config: Config,
**kwargs: Any,
) -> RecordExpander:
parent_fields_to_copy: list[ParentFieldMapping] = []
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ style (list[ParentFieldMapping]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List for type annotations. For consistency, this should be List[ParentFieldMapping] with appropriate import from the typing module.

Suggested change
parent_fields_to_copy: list[ParentFieldMapping] = []
parent_fields_to_copy: List[ParentFieldMapping] = []

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 3
# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copyright header "Copyright (c) 2025 Airbyte, Inc., all rights reserved." was removed from this auto-generated file. This change should be reviewed to ensure it's intentional, as the file previously had a copyright header and removing it might not align with the project's licensing requirements.

Copilot uses AI. Check for mistakes.
Comment on lines 114 to 116
for record in extracted:
if isinstance(record, list):
for item in record:
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop variable record shadows the method parameter record (line 102). While this doesn't cause a bug because parent_record is used instead, it reduces code clarity and could lead to confusion. Consider renaming the loop variable to something more descriptive like extracted_value or matched_array.

Suggested change
for record in extracted:
if isinstance(record, list):
for item in record:
for extracted_value in extracted:
if isinstance(extracted_value, list):
for item in extracted_value:

Copilot uses AI. Check for mistakes.
config: Config
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False
on_no_records: str = "skip"
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The on_no_records parameter is typed as str but should be more restrictive. Consider using Literal["skip", "emit_parent"] from the typing module to ensure type safety and prevent invalid values from being passed at runtime. This would make the type annotation match the schema definition which specifies only these two enum values.

Copilot uses AI. Check for mistakes.
@devin-ai-integration
Copy link
Contributor Author

These Copilot comments about type annotations (list[str | InterpolatedString] vs List[Union[str, InterpolatedString]]) are incorrect for this codebase. The project's coding standards explicitly require modern Python 3.10+ type hint syntax: list[str] instead of List[str], | None instead of Optional[...]. No changes needed here.

Regarding the on_no_records: str vs Literal["skip", "emit_parent"] suggestion — this matches the approach in the original PR #859. The Pydantic model already validates the enum values at the schema level.

The copyright header removal on the auto-generated model file was done by poetry run poe build — not a manual change.

The variable shadowing of record in the wildcard loop (comment ID 2806195326) is a valid readability concern — I'll fix that by renaming the loop variable.

Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant