-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathsimple_retriever.py
759 lines (670 loc) · 29.2 KB
/
simple_retriever.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
from dataclasses import InitVar, dataclass, field
from functools import partial
from itertools import islice
from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union
import dpath
import requests
from airbyte_cdk.models import AirbyteMessage
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.incremental import ResumableFullRefreshCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
SinglePartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.request_options import (
DefaultRequestOptionsProvider,
RequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.http_logger import format_http_message
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import combine_mappings
FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete"
@dataclass
class SimpleRetriever(Retriever):
"""
Retrieves records by synchronously sending requests to fetch records.
The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
For each stream slice, submit requests until there are no more pages of records to fetch.
This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery.
As a result, some of the parameters passed to some methods are unused.
The two will be decoupled in a future release.
Attributes:
stream_name (str): The stream's name
stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
requester (Requester): The HTTP requester
record_selector (HttpSelector): The record selector
paginator (Optional[Paginator]): The paginator
stream_slicer (Optional[StreamSlicer]): The stream slicer
cursor (Optional[cursor]): The cursor
parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
"""
requester: Requester
record_selector: HttpSelector
config: Config
parameters: InitVar[Mapping[str, Any]]
name: str
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
stream_slicer: StreamSlicer = field(
default_factory=lambda: SinglePartitionRouter(parameters={})
)
request_option_provider: RequestOptionsProvider = field(
default_factory=lambda: DefaultRequestOptionsProvider(parameters={})
)
cursor: Optional[DeclarativeCursor] = None
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._paginator = self.paginator or NoPagination(parameters=parameters)
self._parameters = parameters
self._name = (
InterpolatedString(self._name, parameters=parameters)
if isinstance(self._name, str)
else self._name
)
@property # type: ignore
def name(self) -> str:
"""
:return: Stream name
"""
return (
str(self._name.eval(self.config))
if isinstance(self._name, InterpolatedString)
else self._name
)
@name.setter
def name(self, value: str) -> None:
if not isinstance(value, property):
self._name = value
def _get_mapping(
self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any
) -> Tuple[Union[Mapping[str, Any], str], Set[str]]:
"""
Get mapping from the provided method, and get the keys of the mapping.
If the method returns a string, it will return the string and an empty set.
If the method returns a dict, it will return the dict and its keys.
"""
mapping = method(**kwargs) or {}
keys = set(mapping.keys()) if not isinstance(mapping, str) else set()
return mapping, keys
def _get_request_options(
self,
stream_state: Optional[StreamData],
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
) -> Union[Mapping[str, Any], str]:
"""
Get the request_option from the paginator and the stream slicer.
Raise a ValueError if there's a key collision
Returned merged mapping otherwise
"""
# FIXME we should eventually remove the usage of stream_state as part of the interpolation
is_body_json = paginator_method.__name__ == "get_request_body_json"
mappings = [
paginator_method(
stream_slice=stream_slice,
next_page_token=next_page_token,
),
]
if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
mappings.append(
stream_slicer_method(
stream_slice=stream_slice,
next_page_token=next_page_token,
)
)
return combine_mappings(mappings, allow_same_value_merge=is_body_json)
def _request_headers(
self,
stream_state: Optional[StreamData] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Specifies request headers.
Authentication headers will overwrite any overlapping headers returned from this method.
"""
headers = self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self._paginator.get_request_headers,
self.request_option_provider.get_request_headers,
)
if isinstance(headers, str):
raise ValueError("Request headers cannot be a string")
return {str(k): str(v) for k, v in headers.items()}
def _request_params(
self,
stream_state: Optional[StreamData] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
params = self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self._paginator.get_request_params,
self.request_option_provider.get_request_params,
)
if isinstance(params, str):
raise ValueError("Request params cannot be a string")
return params
def _request_body_data(
self,
stream_state: Optional[StreamData] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
"""
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is.
If returns a dict that it will be converted to a urlencoded form.
E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
return self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self._paginator.get_request_body_data,
self.request_option_provider.get_request_body_data,
)
def _request_body_json(
self,
stream_state: Optional[StreamData] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
"""
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
body_json = self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self._paginator.get_request_body_json,
self.request_option_provider.get_request_body_json,
)
if isinstance(body_json, str):
raise ValueError("Request body json cannot be a string")
return body_json
def _paginator_path(
self,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Optional[str]:
"""
If the paginator points to a path, follow it, else return nothing so the requester is used.
:param next_page_token:
:return:
"""
return self._paginator.path(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
)
def _parse_response(
self,
response: Optional[requests.Response],
stream_state: StreamState,
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Record]:
if not response:
yield from []
else:
yield from self.record_selector.select_records(
response=response,
stream_state=stream_state,
records_schema=records_schema,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
@property # type: ignore
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""The stream's primary key"""
return self._primary_key
@primary_key.setter
def primary_key(self, value: str) -> None:
if not isinstance(value, property):
self._primary_key = value
def _next_page_token(
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any],
) -> Optional[Mapping[str, Any]]:
"""
Specifies a pagination strategy.
The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
:return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
"""
return self._paginator.next_page_token(
response=response,
last_page_size=last_page_size,
last_record=last_record,
last_page_token_value=last_page_token_value,
)
def _fetch_next_page(
self,
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[requests.Response]:
return self.requester.send_request(
path=self._paginator_path(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
),
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
request_headers=self._request_headers(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_params=self._request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_body_data=self._request_body_data(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_body_json=self._request_body_json(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
)
# This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
def _read_pages(
self,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
) -> Iterable[Record]:
pagination_complete = False
initial_token = self._paginator.get_initial_token()
next_page_token: Optional[Mapping[str, Any]] = (
{"next_page_token": initial_token} if initial_token else None
)
while not pagination_complete:
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
last_page_size = 0
last_record: Optional[Record] = None
for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record
if not response:
pagination_complete = True
else:
last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
next_page_token = self._next_page_token(
response=response,
last_page_size=last_page_size,
last_record=last_record,
last_page_token_value=last_page_token_value,
)
if not next_page_token:
pagination_complete = True
# Always return an empty generator just in case no records were ever yielded
yield from []
def _read_single_page(
self,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
) -> Iterable[StreamData]:
initial_token = stream_state.get("next_page_token")
if initial_token is None:
initial_token = self._paginator.get_initial_token()
next_page_token: Optional[Mapping[str, Any]] = (
{"next_page_token": initial_token} if initial_token else None
)
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
last_page_size = 0
last_record: Optional[Record] = None
for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record
if not response:
next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
else:
last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
next_page_token = self._next_page_token(
response=response,
last_page_size=last_page_size,
last_record=last_record,
last_page_token_value=last_page_token_value,
) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
if self.cursor:
self.cursor.close_slice(
StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition)
)
# Always return an empty generator just in case no records were ever yielded
yield from []
def read_records(
self,
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
) -> Iterable[StreamData]:
"""
Fetch a stream's records from an HTTP API source
:param records_schema: json schema to describe record
:param stream_slice: The stream slice to read data for
:return: The records read from the API source
"""
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_state=self.state or {},
stream_slice=_slice,
records_schema=records_schema,
)
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
stream_state = self.state
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
# fetch more records. The platform deletes stream state for full refresh streams before starting a
# new job, so we don't need to worry about this value existing for the initial attempt
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
return
yield from self._read_single_page(record_generator, stream_state, _slice)
else:
for stream_data in self._read_pages(record_generator, self.state, _slice):
current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)
# Latest record read, not necessarily within slice boundaries.
# TODO Remove once all custom components implement `observe` method.
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
most_recent_record_from_slice = self._get_most_recent_record(
most_recent_record_from_slice, current_record, _slice
)
yield stream_data
if self.cursor:
self.cursor.close_slice(_slice, most_recent_record_from_slice)
return
def _get_most_recent_record(
self,
current_most_recent: Optional[Record],
current_record: Optional[Record],
stream_slice: StreamSlice,
) -> Optional[Record]:
if self.cursor and current_record:
if not current_most_recent:
return current_record
else:
return (
current_most_recent
if self.cursor.is_greater_than_or_equal(current_most_recent, current_record)
else current_record
)
else:
return None
def _extract_record(
self, stream_data: StreamData, stream_slice: StreamSlice
) -> Optional[Record]:
"""
As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
to data to streamline the rest of the process.
"""
if isinstance(stream_data, Record):
# Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
return stream_data
elif isinstance(stream_data, (dict, Mapping)):
return Record(
data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
)
elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
return Record(
data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data
associated_slice=stream_slice,
stream_name=self.name,
)
return None
# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
"""
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
:param sync_mode:
:param cursor_field:
:param stream_state:
:return:
"""
return self.stream_slicer.stream_slices()
@property
def state(self) -> Mapping[str, Any]:
return self.cursor.get_stream_state() if self.cursor else {}
@state.setter
def state(self, value: StreamState) -> None:
"""State setter, accept state serialized by state getter."""
if self.cursor:
self.cursor.set_initial_state(value)
def _parse_records(
self,
response: Optional[requests.Response],
stream_state: Mapping[str, Any],
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice],
) -> Iterable[Record]:
yield from self._parse_response(
response,
stream_slice=stream_slice,
stream_state=stream_state,
records_schema=records_schema,
)
def must_deduplicate_query_params(self) -> bool:
return True
@staticmethod
def _to_partition_key(to_serialize: Any) -> str:
# separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
@dataclass
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
slices that are queried throughout a read command.
"""
maximum_number_of_slices: int = 5
def __post_init__(self, options: Mapping[str, Any]) -> None:
super().__post_init__(options)
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
raise ValueError(
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
)
# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
return islice(super().stream_slices(), self.maximum_number_of_slices)
def _fetch_next_page(
self,
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[requests.Response]:
return self.requester.send_request(
path=self._paginator_path(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
),
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
request_headers=self._request_headers(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_params=self._request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_body_data=self._request_body_data(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_body_json=self._request_body_json(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
log_formatter=lambda response: format_http_message(
response,
f"Stream '{self.name}' request",
f"Request performed in order to extract records for stream '{self.name}'",
self.name,
),
)
class SafeResponse(requests.Response):
def __getattr__(self, name):
return getattr(requests.Response, name, None)
@property
def content(self):
return super().content
@content.setter
def content(self, value):
self._content = value.encode() if isinstance(value, str) else value
@dataclass
class LazySimpleRetriever(SimpleRetriever):
"""
A retriever that supports lazy loading from parent streams.
"""
partition_router: SubstreamPartitionRouter = field(init=True, repr=False, default=None)
lazy_read_pointer: Optional[List[InterpolatedString]] = None
def _read_pages(
self,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
) -> Iterable[Record]:
parent_stream_config = self.partition_router.parent_stream_configs[-1]
parent_stream = parent_stream_config.stream
for parent_record in parent_stream.read_only_records():
parent_record, parent_partition = self.partition_router.process_parent_record(
parent_record, parent_stream.name
)
if parent_record is None:
continue
childs = self._extract_child_records(parent_record)
response = self._create_response(childs)
yield from self._yield_records_with_pagination(
response,
records_generator_fn,
stream_state,
stream_slice,
parent_record,
parent_stream_config,
)
yield from []
def _extract_child_records(self, parent_record: Mapping) -> Mapping:
"""Extract child records from a parent record based on lazy pointers."""
if not self.lazy_read_pointer:
return parent_record
path = [path.eval(self.config) for path in self.lazy_read_pointer]
return (
dpath.values(parent_record, path)
if "*" in path
else dpath.get(parent_record, path, default=[])
)
def _create_response(self, data: Mapping) -> SafeResponse:
"""Create a SafeResponse with the given data."""
response = SafeResponse()
response.content = json.dumps(data).encode("utf-8")
response.status_code = 200
return response
def _yield_records_with_pagination(
self,
response: requests.Response,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
parent_record: Record,
parent_stream_config: Any,
) -> Iterable[Record]:
"""Yield records, handling pagination if needed."""
last_page_size, last_record = 0, None
for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record
next_page_token = self._next_page_token(response, last_page_size, last_record, None)
if next_page_token:
yield from self._paginate(
next_page_token,
records_generator_fn,
stream_state,
stream_slice,
parent_record,
parent_stream_config,
)
def _paginate(
self,
next_page_token: Any,
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
parent_record: Record,
parent_stream_config: Any,
) -> Iterable[Record]:
"""Handle pagination by fetching subsequent pages."""
partition_field = parent_stream_config.partition_field.eval(self.config)
partition_value = dpath.get(
parent_record, parent_stream_config.parent_key.eval(self.config)
)
stream_slice = StreamSlice(
partition={partition_field: partition_value, "parent_slice": {}},
cursor_slice=stream_slice.cursor_slice,
)
while next_page_token:
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
last_page_size, last_record = 0, None
for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record
last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
next_page_token = self._next_page_token(
response, last_page_size, last_record, last_page_token_value
)