feat(cdk): Add RecordExpander component for nested array extraction#896
feat(cdk): Add RecordExpander component for nested array extraction#896devin-ai-integration[bot] wants to merge 5 commits intomainfrom
Conversation
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1770323608-record-expander#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1770323608-record-expanderPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
|
/prerelease
|
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
There was a problem hiding this comment.
Pull request overview
This PR adds a new RecordExpander declarative component that extracts items from nested array fields within records and emits each item as a separate record. This is a reimplementation of PR #859 to support use cases like Stripe's invoice_line_items and subscription_items streams where API responses contain parent objects with nested arrays that need to be emitted as individual records.
Changes:
- Introduces
RecordExpanderandParentFieldMappingcomponents with support for wildcard paths, configurableon_no_recordsbehavior (skip/emit_parent), optionalremain_original_recordflag, and selective parent field copying - Integrates
RecordExpanderas an optional parameter inDpathExtractorfor seamless record expansion in the extraction pipeline - Adds comprehensive test coverage with 24 parametrized test cases covering basic expansion, wildcards, edge cases, and feature combinations
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
airbyte_cdk/sources/declarative/expanders/__init__.py |
New module initialization exporting RecordExpander and ParentFieldMapping |
airbyte_cdk/sources/declarative/expanders/record_expander.py |
Core implementation of record expansion logic with wildcard support and parent context preservation |
airbyte_cdk/sources/declarative/declarative_component_schema.yaml |
Schema definitions for RecordExpander, ParentFieldMapping, and OnNoRecords enum |
airbyte_cdk/sources/declarative/models/declarative_component_schema.py |
Auto-generated Pydantic models from YAML schema |
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py |
Integration of optional record_expander parameter and expansion logic |
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py |
Factory methods for creating RecordExpander and ParentFieldMapping instances |
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py |
Transformer mapping for DpathExtractor.record_expander |
unit_tests/sources/declarative/extractors/test_dpath_extractor.py |
24 new test cases covering expansion scenarios, wildcards, edge cases, and feature combinations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class ParentFieldMapping: | ||
| """Defines a mapping from a parent record field to a child record field.""" | ||
|
|
||
| source_field_path: list[str | InterpolatedString] |
There was a problem hiding this comment.
Type annotation uses Python 3.10+ style (list[str | InterpolatedString]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List and typing.Union for type annotations. For consistency, this should be List[Union[str, InterpolatedString]] with appropriate imports from the typing module.
| config: The user-provided configuration as specified by the source's spec. | ||
| """ | ||
|
|
||
| expand_records_from_field: list[str | InterpolatedString] |
There was a problem hiding this comment.
Type annotation uses Python 3.10+ style (list[str | InterpolatedString]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List and typing.Union for type annotations. For consistency, this should be List[Union[str, InterpolatedString]] with appropriate imports from the typing module.
| parameters: InitVar[Mapping[str, Any]] | ||
| remain_original_record: bool = False | ||
| on_no_records: str = "skip" | ||
| parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list) |
There was a problem hiding this comment.
Type annotation uses Python 3.10+ style (list[ParentFieldMapping]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List for type annotations. For consistency, this should be List[ParentFieldMapping] with appropriate import from the typing module.
| parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list) | ||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
| self._expand_path: list[InterpolatedString] | None = [ |
There was a problem hiding this comment.
Type annotation uses Python 3.10+ union style (list[InterpolatedString] | None) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.Optional and typing.List. For consistency, this should be Optional[List[InterpolatedString]] with appropriate imports from the typing module.
| config: Config, | ||
| **kwargs: Any, | ||
| ) -> RecordExpander: | ||
| parent_fields_to_copy: list[ParentFieldMapping] = [] |
There was a problem hiding this comment.
Type annotation uses Python 3.10+ style (list[ParentFieldMapping]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List for type annotations. For consistency, this should be List[ParentFieldMapping] with appropriate import from the typing module.
| parent_fields_to_copy: list[ParentFieldMapping] = [] | |
| parent_fields_to_copy: List[ParentFieldMapping] = [] |
| # generated by datamodel-codegen: | ||
| # filename: declarative_component_schema.yaml | ||
|
|
There was a problem hiding this comment.
The copyright header "Copyright (c) 2025 Airbyte, Inc., all rights reserved." was removed from this auto-generated file. This change should be reviewed to ensure it's intentional, as the file previously had a copyright header and removing it might not align with the project's licensing requirements.
| for record in extracted: | ||
| if isinstance(record, list): | ||
| for item in record: |
There was a problem hiding this comment.
The loop variable record shadows the method parameter record (line 102). While this doesn't cause a bug because parent_record is used instead, it reduces code clarity and could lead to confusion. Consider renaming the loop variable to something more descriptive like extracted_value or matched_array.
| for record in extracted: | |
| if isinstance(record, list): | |
| for item in record: | |
| for extracted_value in extracted: | |
| if isinstance(extracted_value, list): | |
| for item in extracted_value: |
| config: Config | ||
| parameters: InitVar[Mapping[str, Any]] | ||
| remain_original_record: bool = False | ||
| on_no_records: str = "skip" |
There was a problem hiding this comment.
The on_no_records parameter is typed as str but should be more restrictive. Consider using Literal["skip", "emit_parent"] from the typing module to ensure type safety and prevent invalid values from being passed at runtime. This would make the type annotation match the schema definition which specifies only these two enum values.
|
These Copilot comments about type annotations ( Regarding the The copyright header removal on the auto-generated model file was done by The variable shadowing of |
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
feat(cdk): Add RecordExpander component for nested array extraction
Summary
Adds a new
RecordExpanderdeclarative component that extracts items from nested array fields within records and emits each item as a separate record. This is an optional parameter onDpathExtractor.New components:
RecordExpander— extracts items from a nested array path (supports wildcards like["sections", "*", "items"]), with configurableon_no_recordsbehavior (skiporemit_parent) andremain_original_recordflagParentFieldMapping— maps fields from the parent record onto each expanded child recordFiles changed (8):
expanders/__init__.pyandexpanders/record_expander.py— new module with core logicdeclarative_component_schema.yaml— schema definitions for both componentsmodels/declarative_component_schema.py— auto-generated Pydantic modelsextractors/dpath_extractor.py— optionalrecord_expanderparameter integrationparsers/model_to_component_factory.py— factory methods + model mappingsparsers/manifest_component_transformer.py— transformer mapping forDpathExtractor.record_expandertest_dpath_extractor.py— 24 new parametrized test casesThis duplicates the changes from PR #859. It was reimplemented (not cherry-picked), so logic should be compared carefully against the original.
Updates since last revision
record→extracted_valuein wildcard expansion path to avoid shadowing the method parameterexcept KeyErrorinParentFieldMapping.copy_fieldRecordExpander/ParentFieldMappingimports to top-level inmodel_to_component_factory.pyfor mypy compliancepoetry run poe buildremoved the copyright header from the auto-generateddeclarative_component_schema.py— this is a side effect of the code generation tool, not an intentional removalReview & Testing Checklist for Human
RecordExpander.expand_record()logic matches the original, especially wildcard handling viadpath.values()and the non-wildcarddpath.get()path. Subtle divergence is possible.original_recordstores a reference, not a deep copy — Whenremain_original_record=True,child_record["original_record"] = parent_recordstores a reference. If downstream consumers mutate the parent, all expanded records'original_recordfields are affected. Confirm this is the intended behavior from feat(cdk): Add RecordExpander component for nested array extraction #859.hasattrusage in factory —create_dpath_extractoruseshasattr(model, "record_expander")as a defensive pattern. The Pydantic model should always have this field with a default ofNone, sohasattrmay be unnecessary. Verify this aligns with factory conventions.models/declarative_component_schema.pylost itsCopyright (c) 2025 Airbyte, Inc.header afterpoetry run poe build. Verify whether the build tool should be configured to preserve it.remain_original_record,on_no_recordsbehavior,parent_fields_to_copy, wildcard paths, edge cases (empty arrays, missing paths, non-array values, mixed types). Confirm no scenarios from the original PR were missed.Suggested test plan: Configure a declarative connector with a
DpathExtractorusingrecord_expanderagainst a real API that returns nested arrays (e.g., Stripe invoice line items) and verify records are expanded correctly with bothon_no_records: skipandon_no_records: emit_parent.Notes