Skip to content

Commit 48d51d7

Browse files
committed
fix: implement 25 bug fixes across request flow, AWS provider, and config
P0 critical: - Enforce state machine in Request.update_status (add PENDING→FAILED transition) - Fix model_copy result discarded in machine_handlers (machines never reached TERMINATED) - Zero-instance provisioning: fail fast when no resources created, stay IN_PROGRESS when fleet exists P1 high: - Fix double-failure leaving return request stuck in IN_PROGRESS (force-write FAILED) - Poll all provider groups on multi-provider return (was only polling first) - Add PARTIAL and TIMEOUT to _TERMINAL_STATUSES in both orchestrators - determine_status_from_machines returns safe default instead of (None, None) - Machines skipped due to missing resource_id now mark request PARTIAL not COMPLETED - Fix SpotFleet retry: inject retry_fn as constructor arg, no silent fallback - Fix LT in deleting state: catch InvalidLaunchTemplateId errors, fall through to create - Zero-fulfillment retry loop: break after 3 consecutive zero-fulfillment attempts - Collapse 3-UoW race window in return request creation into single atomic transaction LT strategy (Scenarios B, D, E): - Support creating new LT version when launchTemplateId + override fields both specified - Add on_update_failure config flag (fail/warn) for LT version creation failures - Make resource tagging non-fatal: _tag_resource_safe and _tag_resources_safe helpers - Separate tag application from LT create call Medium bugs: - Fix direct dict mutation on Request bypassing Pydantic (use set_provider_data/update_metadata) - Replace ValueError for missing provider_api with RequestValidationError (maps to 400) - Deduplicate find_active_requests to prevent double-processing - Normalise tz before loop in find_by_date_range (prevent TypeError) - CancelRequestHandler now uses UoW factory for atomicity - completed_at now set for PARTIAL and TIMEOUT terminal states - Always re-hoist provider_defaults after config dict merge - Warn when env var overrides explicit SDK config_dict value - Event publish retried 3x with backoff before logging ERROR - _persist_acquiring returns (Request, bool) so caller can log without aborting - Re-describe ASG after detach to get live DesiredCapacity for MinSize update Log analysis bugs: - Created tickets 2210-2213 for UnauthorizedOperation retry, debug log at INFO, jinja2 warning per-handler, and missing IAM permissions docs
1 parent 93f1d33 commit 48d51d7

File tree

21 files changed

+2757
-2338
lines changed

21 files changed

+2757
-2338
lines changed

src/orb/application/commands/machine_handlers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ async def execute_command(self, command: CleanupMachineResourcesCommand):
260260
if self.logger:
261261
self.logger.warning("Machine not found for cleanup: %s", machine_id)
262262
continue
263-
machine.model_copy(update={"status": MachineStatus.TERMINATED}) # type: ignore[attr-defined]
263+
machine = machine.model_copy(update={"status": MachineStatus.TERMINATED}) # type: ignore[attr-defined]
264264
self._machine_repository.save(machine)
265265

266266

@@ -324,6 +324,6 @@ async def execute_command(self, command: DeregisterMachineCommand):
324324
if self.logger:
325325
self.logger.warning("Machine not found for deregistration: %s", command.machine_id)
326326
return None
327-
machine.model_copy(update={"status": MachineStatus.TERMINATED}) # type: ignore[attr-defined]
327+
machine = machine.model_copy(update={"status": MachineStatus.TERMINATED}) # type: ignore[attr-defined]
328328
self._machine_repository.save(machine)
329329
return None

src/orb/application/commands/request_creation_handlers.py

Lines changed: 165 additions & 86 deletions
Large diffs are not rendered by default.

src/orb/application/commands/request_lifecycle_handlers.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@ class CancelRequestHandler(BaseCommandHandler[CancelRequestCommand, None]): # t
9494

9595
def __init__(
9696
self,
97-
request_repository: RequestRepository,
97+
uow_factory: UnitOfWorkFactory,
9898
logger: LoggingPort,
9999
event_publisher: EventPublisherPort,
100100
error_handler: ErrorHandlingPort,
101101
) -> None:
102102
super().__init__(logger, event_publisher, error_handler)
103-
self._request_repository = request_repository
103+
self.uow_factory = uow_factory
104104

105105
async def validate_command(self, command: CancelRequestCommand) -> None:
106106
"""Validate cancel request command."""
@@ -113,15 +113,16 @@ async def execute_command(self, command: CancelRequestCommand) -> None:
113113
self.logger.info("Canceling request: %s", command.request_id)
114114

115115
try:
116-
request = self._request_repository.find_by_id(command.request_id)
117-
if not request:
118-
raise EntityNotFoundError("Request", command.request_id)
116+
with self.uow_factory.create_unit_of_work() as uow:
117+
request = uow.requests.find_by_id(command.request_id)
118+
if not request:
119+
raise EntityNotFoundError("Request", command.request_id)
119120

120-
cancelled_request = request.cancel(reason=command.reason)
121+
cancelled_request = request.cancel(reason=command.reason)
121122

122-
events = self._request_repository.save(cancelled_request)
123-
for event in events or []:
124-
self.event_publisher.publish(event) # type: ignore[union-attr]
123+
events = uow.requests.save(cancelled_request)
124+
for event in events or []:
125+
self.event_publisher.publish(event) # type: ignore[union-attr]
125126

126127
self.logger.info("Request canceled: %s", command.request_id)
127128
command.cancelled = True

src/orb/application/services/machine_grouping_service.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ def group_by_provider(self, machine_ids: list[str]) -> dict[tuple[str, str], lis
5858

5959
return dict(provider_groups)
6060

61-
def group_by_resource(self, machine_ids: list[str]) -> dict[tuple[str, str, str], list[Any]]:
61+
def group_by_resource(
62+
self, machine_ids: list[str]
63+
) -> tuple[dict[tuple[str, str, str], list[Any]], list[str]]:
6264
"""Group machines by (provider_name, provider_api, resource_id).
6365
6466
This grouping is used for parallel deprovisioning operations where
@@ -68,12 +70,15 @@ def group_by_resource(self, machine_ids: list[str]) -> dict[tuple[str, str, str]
6870
machine_ids: List of machine IDs to group
6971
7072
Returns:
71-
Dictionary mapping (provider_name, provider_api, resource_id) to list of machine objects
73+
Tuple of:
74+
- Dictionary mapping (provider_name, provider_api, resource_id) to list of machine objects
75+
- List of machine IDs that were skipped (missing provider_api or resource_id)
7276
7377
Raises:
7478
ValueError: If machine context cannot be determined
7579
"""
7680
resource_groups: dict[tuple[str, str, str], list[Any]] = defaultdict(list)
81+
skipped_ids: list[str] = []
7782

7883
for machine_id in machine_ids:
7984
try:
@@ -88,12 +93,14 @@ def group_by_resource(self, machine_ids: list[str]) -> dict[tuple[str, str, str]
8893
"Machine %s has no provider_api — skipping",
8994
machine_id,
9095
)
96+
skipped_ids.append(machine_id)
9197
continue
9298
if not machine.resource_id:
9399
self.logger.warning(
94100
"Machine %s has no resource_id — skipping",
95101
machine_id,
96102
)
103+
skipped_ids.append(machine_id)
97104
continue
98105
group_key = (
99106
machine.provider_name,
@@ -116,4 +123,4 @@ def group_by_resource(self, machine_ids: list[str]) -> dict[tuple[str, str, str]
116123
},
117124
)
118125

119-
return dict(resource_groups)
126+
return dict(resource_groups), skipped_ids

src/orb/application/services/orchestration/acquire_machines.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from orb.domain.base.exceptions import ApplicationError
1414
from orb.domain.base.ports.logging_port import LoggingPort
1515

16-
_TERMINAL_STATUSES = {"completed", "complete", "failed", "error", "cancelled", "canceled"}
16+
_TERMINAL_STATUSES = {"completed", "complete", "failed", "error", "cancelled", "canceled", "partial", "timeout"}
1717
_MAX_CONSECUTIVE_POLL_ERRORS = 3
1818

1919

src/orb/application/services/orchestration/return_machines.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
from orb.domain.base.exceptions import ApplicationError
1414
from orb.domain.base.ports.logging_port import LoggingPort
1515

16-
_TERMINAL_STATUSES = {"completed", "complete", "failed", "error", "cancelled", "canceled"}
16+
_TERMINAL_STATUSES = {"completed", "complete", "failed", "error", "cancelled", "canceled", "partial", "timeout"}
1717
_MAX_CONSECUTIVE_POLL_ERRORS = 3
1818

19+
_STATUS_RANK = {"failed": 3, "error": 3, "partial": 2, "timeout": 2, "completed": 1, "complete": 1}
20+
1921

2022
class ReturnMachinesOrchestrator(OrchestratorBase[ReturnMachinesInput, ReturnMachinesOutput]):
2123
"""Orchestrator for returning machines to the provider."""
@@ -68,14 +70,24 @@ async def execute(self, input: ReturnMachinesInput) -> ReturnMachinesOutput: #
6870
status="no_op",
6971
skipped_machines=skipped,
7072
)
71-
request_id = command.created_request_ids[0]
73+
74+
# Use the first request ID as the primary for the output, but poll ALL
75+
# provider-group requests so a failure in any group is not silently ignored.
76+
primary_request_id = command.created_request_ids[0]
7277
status = "pending"
7378

7479
if input.wait:
75-
status = await self._poll_until_terminal(request_id, input.timeout_seconds)
80+
statuses = await asyncio.gather(
81+
*[
82+
self._poll_until_terminal(rid, input.timeout_seconds)
83+
for rid in command.created_request_ids
84+
]
85+
)
86+
# Return the worst status: failed > partial/timeout > completed
87+
status = max(statuses, key=lambda s: _STATUS_RANK.get(s.lower(), 0))
7688

7789
return ReturnMachinesOutput(
78-
request_id=request_id,
90+
request_id=primary_request_id,
7991
status=status,
8092
)
8193

src/orb/application/services/provisioning_orchestration_service.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ async def execute_provisioning(
6969
started_at = datetime.now(timezone.utc)
7070
remaining = request.requested_count
7171
attempt_number = 0
72+
consecutive_zero_fulfillments = 0
7273

7374
accumulated_resource_ids: list[str] = []
7475
accumulated_machine_ids: list[str] = []
@@ -127,6 +128,17 @@ async def execute_provisioning(
127128
fulfilled_this_attempt = last_result.fulfilled_count
128129
remaining -= fulfilled_this_attempt
129130

131+
if fulfilled_this_attempt == 0 and last_result.success:
132+
consecutive_zero_fulfillments += 1
133+
if consecutive_zero_fulfillments >= 3:
134+
self._logger.warning(
135+
"Breaking retry loop after %d consecutive zero-fulfillment attempts",
136+
consecutive_zero_fulfillments,
137+
)
138+
break
139+
else:
140+
consecutive_zero_fulfillments = 0
141+
130142
# Append to fulfillment_attempts audit trail
131143
attempt_record = {
132144
"attempt": attempt_number,
@@ -156,7 +168,14 @@ async def execute_provisioning(
156168
request.requested_count,
157169
remaining,
158170
)
159-
request = self._persist_acquiring(request)
171+
request, persist_ok = self._persist_acquiring(request)
172+
if not persist_ok:
173+
self._logger.warning(
174+
"ACQUIRING persist failed for request %s on attempt %d — "
175+
"continuing retry loop with in-memory state",
176+
request.request_id,
177+
attempt_number,
178+
)
160179
elif last_result.is_final:
161180
# No point retrying
162181
break
@@ -175,8 +194,14 @@ async def execute_provisioning(
175194
is_final=last_result.is_final if last_result else True,
176195
)
177196

178-
def _persist_acquiring(self, request: Request) -> Request:
179-
"""Persist request with ACQUIRING status between retry attempts."""
197+
def _persist_acquiring(self, request: Request) -> tuple[Request, bool]:
198+
"""Persist request with ACQUIRING status between retry attempts.
199+
200+
Returns:
201+
(updated_request, success) — success is False when the DB write
202+
failed. The caller should log a warning but continue the retry loop
203+
because the in-memory request is still valid.
204+
"""
180205
from orb.domain.base import UnitOfWorkFactory
181206

182207
try:
@@ -186,10 +211,10 @@ def _persist_acquiring(self, request: Request) -> Request:
186211
uow_factory = self._container.get(UnitOfWorkFactory)
187212
with uow_factory.create_unit_of_work() as uow:
188213
uow.requests.save(updated)
189-
return updated
214+
return updated, True
190215
except Exception as e:
191216
self._logger.warning("Failed to persist ACQUIRING status: %s", e)
192-
return request
217+
return request, False
193218

194219
def _record_provider_success(self, provider_name: str) -> None:
195220
"""Reset circuit breaker failure count after a successful dispatch."""

src/orb/application/services/request_creation_service.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from orb.domain.base.ports import LoggingPort
55
from orb.domain.base.results import ProviderSelectionResult
66
from orb.domain.request.aggregate import Request
7+
from orb.domain.request.exceptions import RequestValidationError
78
from orb.domain.request.value_objects import RequestType
89
from orb.domain.template.template_aggregate import Template
910

@@ -56,7 +57,9 @@ def create_machine_request(
5657

5758
# Store provider API in domain field
5859
if not template.provider_api:
59-
raise ValueError(f"Template {template.template_id} has no provider_api configured")
60+
raise RequestValidationError(
61+
f"Template {template.template_id} has no provider_api configured"
62+
)
6063
request.provider_api = template.provider_api
6164

6265
self._logger.info(

src/orb/application/services/request_status_management_service.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async def update_request_from_provisioning(
5151

5252
# Store provider-specific data
5353
if provider_data:
54-
request.provider_data.update(provider_data)
54+
request = request.set_provider_data({**request.provider_data, **provider_data})
5555

5656
# Handle provider errors for partial success
5757
provider_errors = (
@@ -60,7 +60,7 @@ async def update_request_from_provisioning(
6060
has_api_errors = bool(provider_errors)
6161

6262
if has_api_errors and not request.metadata.get("fleet_errors"):
63-
request.metadata["fleet_errors"] = provider_errors
63+
request = request.update_metadata({"fleet_errors": provider_errors})
6464

6565
# Create and save machine aggregates
6666
if instances:
@@ -91,8 +91,9 @@ def _handle_provisioning_failure(self, request: Any, provisioning_result: Any) -
9191
RequestStatus.FAILED, f"Provisioning failed: {error_message}"
9292
)
9393

94-
request.metadata["error_message"] = error_message
95-
request.metadata["error_type"] = "ProvisioningFailure"
94+
request = request.update_metadata(
95+
{"error_message": error_message, "error_type": "ProvisioningFailure"}
96+
)
9697

9798
return request
9899

@@ -139,11 +140,16 @@ def _update_request_status(
139140
RequestStatus.PARTIAL,
140141
f"Partially fulfilled: {instance_count}/{requested_count} instances",
141142
)
142-
else:
143+
elif request.resource_ids:
143144
request = request.update_status(
144145
RequestStatus.IN_PROGRESS,
145146
"Resources created, instances pending",
146147
)
148+
else:
149+
request = request.update_status(
150+
RequestStatus.FAILED,
151+
"No instances provisioned and no cloud resources created",
152+
)
147153

148154
return request
149155

src/orb/application/services/request_status_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def determine_status_from_machines(
7979
machines_to_check = provider_machines if provider_machines else db_machines
8080

8181
if not machines_to_check:
82-
return None, None
82+
return RequestStatus.IN_PROGRESS.value, "Status determination failed — will retry"
8383

8484
running_count = sum(1 for m in machines_to_check if m.status.value == "running")
8585
pending_count = sum(
@@ -147,7 +147,7 @@ def determine_status_from_machines(
147147

148148
except Exception as e:
149149
self.logger.error(f"Failed to determine status from machines: {e}")
150-
return None, None
150+
return RequestStatus.IN_PROGRESS.value, "Status determination failed — will retry"
151151

152152
async def update_request_status(self, request: Request, status: str, message: str) -> Request:
153153
"""Update request status."""

0 commit comments

Comments
 (0)