Skip to content

Commit 2b1f325

Browse files
authored
fix: (CDK) (AsyncRetriever) - fix the regression. The TIMEOUT Job Status should Retry on server status (#432)
1 parent c4d0f91 commit 2b1f325

File tree

4 files changed

+34
-21
lines changed

4 files changed

+34
-21
lines changed

airbyte_cdk/sources/declarative/async_job/job.py

+6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ def api_job_id(self) -> str:
3434

3535
def status(self) -> AsyncJobStatus:
3636
if self._timer.has_timed_out():
37+
# TODO: we should account the fact that,
38+
# certain APIs could send the `Timeout` status,
39+
# thus we should not return `Timeout` in that case,
40+
# but act based on the scenario.
41+
42+
# the default behavior is to return `Timeout` status and retry.
3743
return AsyncJobStatus.TIMED_OUT
3844
return self._status
3945

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

+18-18
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,21 @@ class AsyncPartition:
4444
This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
4545
"""
4646

47-
_MAX_NUMBER_OF_ATTEMPTS = 3
47+
_DEFAULT_MAX_JOB_RETRY = 3
4848

49-
def __init__(self, jobs: List[AsyncJob], stream_slice: StreamSlice) -> None:
49+
def __init__(
50+
self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
51+
) -> None:
5052
self._attempts_per_job = {job: 1 for job in jobs}
5153
self._stream_slice = stream_slice
54+
self._job_max_retry = (
55+
job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
56+
)
5257

5358
def has_reached_max_attempt(self) -> bool:
5459
return any(
5560
map(
56-
lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS,
61+
lambda attempt_count: attempt_count >= self._job_max_retry,
5762
self._attempts_per_job.values(),
5863
)
5964
)
@@ -62,7 +67,7 @@ def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> Non
6267
current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
6368
if current_attempt_count is None:
6469
raise ValueError("Could not find job to replace")
65-
elif current_attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS:
70+
elif current_attempt_count >= self._job_max_retry:
6671
raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
6772

6873
new_attempt_count = current_attempt_count + 1
@@ -155,6 +160,7 @@ def __init__(
155160
message_repository: MessageRepository,
156161
exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
157162
has_bulk_parent: bool = False,
163+
job_max_retry: Optional[int] = None,
158164
) -> None:
159165
"""
160166
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
@@ -175,11 +181,12 @@ def __init__(
175181
self._message_repository = message_repository
176182
self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
177183
self._has_bulk_parent = has_bulk_parent
184+
self._job_max_retry = job_max_retry
178185

179186
self._non_breaking_exceptions: List[Exception] = []
180187

181188
def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
182-
failed_status_jobs = (AsyncJobStatus.FAILED,)
189+
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
183190
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
184191
for job in jobs_to_replace:
185192
new_job = self._start_job(job.job_parameters(), job.api_job_id())
@@ -214,7 +221,7 @@ def _start_jobs(self) -> None:
214221
for _slice in self._slice_iterator:
215222
at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True
216223
job = self._start_job(_slice)
217-
self._running_partitions.append(AsyncPartition([job], _slice))
224+
self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry))
218225
if self._has_bulk_parent and self._slice_iterator.has_next():
219226
break
220227
except ConcurrentJobLimitReached:
@@ -363,7 +370,7 @@ def _process_running_partitions_and_yield_completed_ones(
363370
self._reallocate_partition(current_running_partitions, partition)
364371

365372
# We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
366-
self._remove_completed_or_timed_out_jobs(partition)
373+
self._remove_completed_jobs(partition)
367374

368375
# update the referenced list with running partitions
369376
self._running_partitions = current_running_partitions
@@ -378,11 +385,7 @@ def _stop_partition(self, partition: AsyncPartition) -> None:
378385
def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
379386
for job in partition.jobs:
380387
if job.status() == AsyncJobStatus.TIMED_OUT:
381-
self._abort_job(job, free_job_allocation=True)
382-
raise AirbyteTracedException(
383-
internal_message=f"Job {job.api_job_id()} has timed out. Try increasing the `polling job timeout`.",
384-
failure_type=FailureType.config_error,
385-
)
388+
self._abort_job(job, free_job_allocation=False)
386389

387390
def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
388391
try:
@@ -392,15 +395,15 @@ def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
392395
except Exception as exception:
393396
LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")
394397

395-
def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None:
398+
def _remove_completed_jobs(self, partition: AsyncPartition) -> None:
396399
"""
397400
Remove completed or timed out jobs from the partition.
398401
399402
Args:
400403
partition (AsyncPartition): The partition to process.
401404
"""
402405
for job in partition.jobs:
403-
if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.TIMED_OUT]:
406+
if job.status() == AsyncJobStatus.COMPLETED:
404407
self._job_tracker.remove_job(job.api_job_id())
405408

406409
def _reallocate_partition(
@@ -415,10 +418,7 @@ def _reallocate_partition(
415418
current_running_partitions (list): The list of currently running partitions.
416419
partition (AsyncPartition): The partition to reallocate.
417420
"""
418-
for job in partition.jobs:
419-
if job.status() != AsyncJobStatus.TIMED_OUT:
420-
# allow the FAILED jobs to be re-allocated for partition
421-
current_running_partitions.insert(0, partition)
421+
current_running_partitions.insert(0, partition)
422422

423423
def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
424424
"""

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -3073,8 +3073,11 @@ def _get_job_timeout() -> datetime.timedelta:
30733073
stream_slices,
30743074
self._job_tracker,
30753075
self._message_repository,
3076-
has_bulk_parent=False,
30773076
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
3077+
has_bulk_parent=False,
3078+
# set the `job_max_retry` to 1 for the `Connector Builder`` use-case.
3079+
# `None` == default retry is set to 3 attempts, under the hood.
3080+
job_max_retry=1 if self._emit_connector_builder_messages else None,
30783081
),
30793082
stream_slicer=stream_slicer,
30803083
config=config,

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,14 @@ def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget
144144
)
145145
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)
146146

147-
with pytest.raises(AirbyteTracedException) as error:
147+
with pytest.raises(AirbyteTracedException):
148148
list(orchestrator.create_and_get_completed_partitions())
149149

150-
assert "Job an api job id has timed out" in str(error.value)
150+
assert job_tracker.try_to_get_intent()
151+
assert (
152+
self._job_repository.start.call_args_list
153+
== [call(_A_STREAM_SLICE)] * _MAX_NUMBER_OF_ATTEMPTS
154+
)
151155

152156
@mock.patch(sleep_mock_target)
153157
def test_given_failure_when_create_and_get_completed_partitions_then_raise_exception(

0 commit comments

Comments
 (0)