Skip to content

Commit 837913f

Browse files
maxi297octavia-squidington-iii
and
octavia-squidington-iii
authored
fix: ensure most_recent_cursor_value is deserialized (#436)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent fc7c98c commit 837913f

File tree

2 files changed

+62
-4
lines changed

2 files changed

+62
-4
lines changed

airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py

+4
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, An
7171
for stream_slice in state.get("slices", []):
7272
stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY])
7373
stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY])
74+
if self.MOST_RECENT_RECORD_KEY in stream_slice:
75+
stream_slice[self.MOST_RECENT_RECORD_KEY] = self._from_state_message(
76+
stream_slice[self.MOST_RECENT_RECORD_KEY]
77+
)
7478
return state
7579

7680
def serialize(

unit_tests/sources/streams/concurrent/test_cursor.py

+58-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
from copy import deepcopy
55
from datetime import datetime, timedelta, timezone
66
from functools import partial
77
from typing import Any, Mapping, Optional
@@ -84,7 +84,7 @@ def _cursor_with_slice_boundary_fields(
8484
return ConcurrentCursor(
8585
_A_STREAM_NAME,
8686
_A_STREAM_NAMESPACE,
87-
{},
87+
deepcopy(_NO_STATE),
8888
self._message_repository,
8989
self._state_manager,
9090
EpochValueConcurrentStreamStateConverter(is_sequential_state),
@@ -99,7 +99,7 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor:
9999
return ConcurrentCursor(
100100
_A_STREAM_NAME,
101101
_A_STREAM_NAMESPACE,
102-
{},
102+
deepcopy(_NO_STATE),
103103
self._message_repository,
104104
self._state_manager,
105105
EpochValueConcurrentStreamStateConverter(is_sequential_state=True),
@@ -265,7 +265,7 @@ def test_given_no_state_when_generate_slices_then_create_slice_from_start_to_end
265265
cursor = ConcurrentCursor(
266266
_A_STREAM_NAME,
267267
_A_STREAM_NAMESPACE,
268-
_NO_STATE,
268+
deepcopy(_NO_STATE),
269269
self._message_repository,
270270
self._state_manager,
271271
EpochValueConcurrentStreamStateConverter(is_sequential_state=False),
@@ -950,6 +950,60 @@ def test_given_initial_state_is_sequential_and_start_provided_when_generate_slic
950950
}, # State message is updated to the legacy format before being emitted
951951
)
952952

953+
@freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc))
954+
def test_given_most_recent_cursor_value_in_input_state_when_emit_state_then_serialize_state_properly(
955+
self,
956+
) -> None:
957+
cursor = ConcurrentCursor(
958+
_A_STREAM_NAME,
959+
_A_STREAM_NAMESPACE,
960+
{
961+
"state_type": ConcurrencyCompatibleStateType.date_range.value,
962+
"slices": [
963+
{
964+
EpochValueConcurrentStreamStateConverter.START_KEY: 0,
965+
EpochValueConcurrentStreamStateConverter.END_KEY: 20,
966+
EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15,
967+
},
968+
],
969+
},
970+
self._message_repository,
971+
self._state_manager,
972+
EpochValueConcurrentStreamStateConverter(is_sequential_state=False),
973+
CursorField(_A_CURSOR_FIELD_KEY),
974+
_SLICE_BOUNDARY_FIELDS,
975+
datetime.fromtimestamp(0, timezone.utc),
976+
EpochValueConcurrentStreamStateConverter.get_end_provider(),
977+
_NO_LOOKBACK_WINDOW,
978+
)
979+
980+
cursor.close_partition(
981+
_partition(
982+
StreamSlice(
983+
partition={},
984+
cursor_slice={
985+
_LOWER_SLICE_BOUNDARY_FIELD: 20,
986+
_UPPER_SLICE_BOUNDARY_FIELD: 50,
987+
},
988+
),
989+
_stream_name=_A_STREAM_NAME,
990+
)
991+
)
992+
993+
expected_state = {
994+
"state_type": ConcurrencyCompatibleStateType.date_range.value,
995+
"slices": [
996+
{
997+
EpochValueConcurrentStreamStateConverter.START_KEY: 0,
998+
EpochValueConcurrentStreamStateConverter.END_KEY: 50,
999+
EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15,
1000+
},
1001+
],
1002+
}
1003+
self._state_manager.update_state_for_stream.assert_called_once_with(
1004+
_A_STREAM_NAME, _A_STREAM_NAMESPACE, expected_state
1005+
)
1006+
9531007

9541008
class ClampingIntegrationTest(TestCase):
9551009
def setUp(self) -> None:

0 commit comments

Comments
 (0)