Skip to content

Commit 041632d

Browse files
committed
Update partition router with process_parent_record
1 parent 3723a3f commit 041632d

File tree

1 file changed

+44
-23
lines changed

1 file changed

+44
-23
lines changed

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

+44-23
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
5+
46
import copy
57
import logging
68
from dataclasses import InitVar, dataclass
7-
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
9+
from typing import TYPE_CHECKING, Any, Iterable, Tuple, List, Mapping, MutableMapping, Optional, Union
810

911
import dpath
1012

@@ -131,6 +133,40 @@ def _get_request_option(
131133
parent_config.request_option.inject_into_request(params, value, self.config)
132134
return params
133135

136+
def process_parent_record(self, parent_record: Union[AirbyteMessage, Record, Mapping], parent_stream_name: str) -> Tuple[Optional[Mapping], Optional[Mapping]]:
137+
"""
138+
Processes and extracts data from a parent record, handling different record types
139+
and ensuring only valid types proceed.
140+
141+
:param parent_record: The parent record to process.
142+
:param parent_stream_name: The parent stream name associated with the record.
143+
:return: Extracted record data and partition (if applicable).
144+
:raises AirbyteTracedException: If the record type is invalid.
145+
"""
146+
if isinstance(parent_record, AirbyteMessage):
147+
self.logger.warning(
148+
f"Parent stream {parent_stream_name} returns records of type AirbyteMessage. "
149+
f"This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
150+
)
151+
if parent_record.type == MessageType.RECORD:
152+
return parent_record.record.data, {}
153+
return None, None # Skip invalid or non-record data
154+
155+
# Handle Record type
156+
if isinstance(parent_record, Record):
157+
parent_partition = (
158+
parent_record.associated_slice.partition if parent_record.associated_slice else {}
159+
)
160+
return parent_record.data, parent_partition
161+
162+
# Validate the record type
163+
if not isinstance(parent_record, Mapping):
164+
raise AirbyteTracedException(
165+
message=f"Parent stream returned records as invalid type {type(parent_record)}"
166+
)
167+
168+
return parent_record, {}
169+
134170
def stream_slices(self) -> Iterable[StreamSlice]:
135171
"""
136172
Iterate over each parent stream's record and create a StreamSlice for each record.
@@ -163,28 +199,13 @@ def stream_slices(self) -> Iterable[StreamSlice]:
163199
# read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
164200
# not support either substreams or RFR, but something that needs to be considered once we do
165201
for parent_record in parent_stream.read_only_records():
166-
parent_partition = None
167-
# Skip non-records (eg AirbyteLogMessage)
168-
if isinstance(parent_record, AirbyteMessage):
169-
self.logger.warning(
170-
f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
171-
)
172-
if parent_record.type == MessageType.RECORD:
173-
parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record
174-
else:
175-
continue
176-
elif isinstance(parent_record, Record):
177-
parent_partition = (
178-
parent_record.associated_slice.partition
179-
if parent_record.associated_slice
180-
else {}
181-
)
182-
parent_record = parent_record.data
183-
elif not isinstance(parent_record, Mapping):
184-
# The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
185-
raise AirbyteTracedException(
186-
message=f"Parent stream returned records as invalid type {type(parent_record)}"
187-
)
202+
# Process the parent record
203+
parent_record, parent_partition = self.process_parent_record(parent_record, parent_stream.name)
204+
205+
# Skip invalid or non-record data
206+
if parent_record is None:
207+
continue
208+
188209
try:
189210
partition_value = dpath.get(
190211
parent_record, # type: ignore [arg-type]

0 commit comments

Comments
 (0)