Skip to content

Commit a0f7250

Browse files
authored
Merge pull request #149 from awslabs/fix/aws-resource-cleanup-leaks
fix: resolve AWS resource and launch template cleanup leaks
2 parents f2be2c1 + 3764016 commit a0f7250

52 files changed

Lines changed: 8377 additions & 2700 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

config/default_config.json

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,17 @@
101101
"spot_fleet_request_expiry": 30,
102102
"percent_on_demand": 0,
103103
"root_device_volume_size": 20
104+
},
105+
"cleanup": {
106+
"enabled": true,
107+
"delete_launch_template": true,
108+
"dry_run": false,
109+
"resources": {
110+
"asg": true,
111+
"ec2_fleet": true,
112+
"spot_fleet": true,
113+
"run_instances": true
114+
}
104115
}
105116
}
106117
}
@@ -393,16 +404,5 @@
393404
"asg": "",
394405
"tag": ""
395406
}
396-
},
397-
"cleanup": {
398-
"enabled": true,
399-
"delete_launch_template": true,
400-
"dry_run": false,
401-
"resources": {
402-
"asg": true,
403-
"ec2_fleet": true,
404-
"spot_fleet": true,
405-
"run_instances": true
406-
}
407407
}
408408
}

pyproject.toml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@ markers = [
304304
"provider_contract: part of the provider contract test suite",
305305
"simulator_limitation: assertion weakened due to known simulator gap",
306306
"requires_real_provider: skipped unless --run-real-provider flag passed",
307+
"moto: marks tests that use moto-mocked AWS",
308+
"cli: marks CLI integration tests",
307309
]
308310
env = [
309311
"AWS_DEFAULT_REGION=us-east-1",
@@ -416,8 +418,16 @@ combine-as-imports = true
416418
[tool.pyright]
417419
pythonVersion = "3.12"
418420
pythonPlatform = "Linux"
419-
include = ["src/orb"] # keep in sync with .project.yml build.package_root
420-
exclude = [".venv", "build", "dist"]
421+
include = ["src/orb", "tests/shared", "tests/onmoto"] # keep in sync with .project.yml build.package_root
422+
exclude = [
423+
".venv", "build", "dist",
424+
# onmoto files with pre-existing type errors, excluded until resolved
425+
"tests/onmoto/test_config_driven_provision.py",
426+
"tests/onmoto/test_cqrs_control_loop.py",
427+
"tests/onmoto/test_hf_contract.py",
428+
"tests/onmoto/test_provision_lifecycle.py",
429+
]
430+
extraPaths = ["."]
421431
typeCheckingMode = "basic"
422432
reportMissingImports = true
423433
reportMissingTypeStubs = false

src/orb/config/schemas/app_schema.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from pydantic import BaseModel, Field, field_validator, model_validator
77

8-
from .cleanup_schema import CleanupConfig
98
from .common_schema import (
109
DatabaseConfig,
1110
EventsConfig,
@@ -43,7 +42,6 @@ class AppConfig(BaseModel):
4342
performance: PerformanceConfig = Field(default_factory=lambda: PerformanceConfig()) # type: ignore[call-arg]
4443
server: ServerConfig = Field(default_factory=lambda: ServerConfig()) # type: ignore[call-arg]
4544
native_spec: NativeSpecConfig = Field(default_factory=lambda: NativeSpecConfig()) # type: ignore[call-arg]
46-
cleanup: CleanupConfig = Field(default_factory=lambda: CleanupConfig()) # type: ignore[call-arg]
4745
environment: str = Field("development", description="Environment")
4846
debug: bool = Field(False, description="Debug mode")
4947
request_timeout: int = Field(300, description="Request timeout in seconds")

src/orb/config/schemas/provider_strategy_schema.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
77

88
from .base_config import BaseCircuitBreakerConfig
9+
from .cleanup_schema import CleanupConfig
910

1011

1112
class HandlerConfig(BaseModel):
@@ -38,8 +39,9 @@ class ProviderDefaults(BaseModel):
3839
default_factory=dict, description="Template defaults for this provider type"
3940
)
4041
extensions: Optional[dict[str, Any]] = Field(
41-
None, description="Provider-specific extensions configuration"
42+
default=None, description="Provider-specific extensions configuration"
4243
)
44+
cleanup: Optional[CleanupConfig] = Field(default=None)
4345

4446

4547
class ProviderMode(str, Enum):
@@ -95,7 +97,6 @@ class ProviderInstanceConfig(BaseModel):
9597
enabled: bool = Field(True, description="Whether this provider is enabled")
9698
priority: int = Field(0, description="Provider priority (lower = higher priority)")
9799
weight: int = Field(100, description="Provider weight for load balancing")
98-
# Keep dict for backward compatibility
99100
config: dict[str, Any] = Field(
100101
default_factory=dict, description="Provider-specific configuration"
101102
)

src/orb/domain/base/ports/configuration_port.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,6 @@ def set_configuration_value(self, key: str, value: Any) -> None:
9393
def get_resource_prefix(self, resource_type: str) -> str:
9494
"""Get resource naming prefix for the given resource type."""
9595

96-
@abstractmethod
97-
def get_cleanup_config(self) -> dict[str, Any]:
98-
"""Get cleanup configuration."""
99-
10096
@abstractmethod
10197
def get_config_file_path(self) -> str:
10298
"""Get the config file path."""

src/orb/infrastructure/adapters/configuration_adapter.py

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -334,37 +334,11 @@ def get_resource_prefix(self, resource_type: str) -> str:
334334
if hasattr(resource_config.prefixes, resource_type):
335335
return getattr(resource_config.prefixes, resource_type)
336336
return resource_config.default_prefix
337-
except Exception:
338-
return ""
339-
340-
def get_cleanup_config(self) -> dict[str, Any]:
341-
"""Get cleanup configuration."""
342-
try:
343-
cleanup = self._config_manager.app_config.cleanup
344-
return {
345-
"enabled": cleanup.enabled,
346-
"delete_launch_template": cleanup.delete_launch_template,
347-
"dry_run": cleanup.dry_run,
348-
"resources": {
349-
"asg": cleanup.resources.asg,
350-
"ec2_fleet": cleanup.resources.ec2_fleet,
351-
"spot_fleet": cleanup.resources.spot_fleet,
352-
"run_instances": cleanup.resources.run_instances,
353-
},
354-
}
355337
except Exception as e:
356-
_logger.warning("Failed to load cleanup config, using defaults: %s", e)
357-
return {
358-
"enabled": True,
359-
"delete_launch_template": True,
360-
"dry_run": False,
361-
"resources": {
362-
"asg": True,
363-
"ec2_fleet": True,
364-
"spot_fleet": True,
365-
"run_instances": True,
366-
},
367-
}
338+
_logger.warning(
339+
"Failed to get resource prefix for '%s', using empty prefix: %s", resource_type, e
340+
)
341+
return ""
368342

369343
def get_active_provider_override(self) -> str | None:
370344
"""Get current provider override from CLI."""

src/orb/infrastructure/scheduler/hostfactory/field_mappings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class HostFactoryFieldMappings:
3131
"instanceTags": "tags", # Will be parsed from string format
3232
# Template metadata
3333
"name": "name",
34+
"requestId": "request_id",
3435
"providerName": "provider_name",
3536
"providerApi": "provider_api",
3637
"providerType": "provider_type",

src/orb/interface/mcp/server/core.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def _register_core_tools(self) -> None:
5858
from orb.interface.request_command_handlers import (
5959
handle_get_request_status,
6060
handle_get_return_requests,
61+
handle_list_requests,
6162
handle_request_machines,
6263
handle_request_return_machines,
6364
)
@@ -86,6 +87,7 @@ def _register_core_tools(self) -> None:
8687

8788
# Request tools
8889
self.tools["get_request_status"] = handle_get_request_status
90+
self.tools["list_requests"] = handle_list_requests
8991
self.tools["request_machines"] = handle_request_machines
9092
self.tools["list_return_requests"] = handle_get_return_requests
9193
self.tools["return_machines"] = handle_request_return_machines

src/orb/interface/request_command_handlers.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,21 @@ async def handle_request_return_machines(args: "argparse.Namespace") -> dict[str
424424
return {"requestId": request_id, "message": "Return request created successfully"}
425425

426426

427+
@handle_interface_exceptions(context="list_requests", interface_type="cli")
428+
async def handle_list_requests(args: "argparse.Namespace") -> dict[str, Any]:
429+
"""List all active provisioning requests."""
430+
container = get_container()
431+
query_bus = container.get(QueryBus)
432+
scheduler_strategy = container.get(SchedulerPort)
433+
434+
from orb.application.dto.queries import ListActiveRequestsQuery
435+
436+
query = ListActiveRequestsQuery(all_resources=True)
437+
request_dtos = await query_bus.execute(query)
438+
439+
return scheduler_strategy.format_request_status_response(request_dtos)
440+
441+
427442
@handle_interface_exceptions(context="cancel_request", interface_type="cli")
428443
async def handle_cancel_request(args: "argparse.Namespace") -> dict[str, Any]:
429444
"""Handle cancel request operations."""

src/orb/providers/aws/infrastructure/handlers/asg/capacity_manager.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,24 @@ def release_instances(
138138

139139
if not asg_details:
140140
self._logger.warning(
141-
"ASG details missing for %s, terminating instances without ASG operations",
141+
"ASG details missing for %s, attempting direct describe before cleanup",
142+
asg_name,
143+
)
144+
try:
145+
response = self._retry_with_backoff(
146+
self._aws_client.autoscaling_client.describe_auto_scaling_groups,
147+
operation_type="read_only",
148+
AutoScalingGroupNames=[asg_name],
149+
)
150+
groups = response.get("AutoScalingGroups", [])
151+
if groups:
152+
asg_details = groups[0]
153+
except Exception as exc:
154+
self._logger.warning("Retry describe for ASG %s also failed: %s", asg_name, exc)
155+
156+
if not asg_details:
157+
self._logger.warning(
158+
"ASG details unavailable for %s, terminating instances and proceeding with cleanup",
142159
asg_name,
143160
)
144161
self._aws_ops.terminate_instances_with_fallback(
@@ -151,6 +168,8 @@ def release_instances(
151168
asg_name,
152169
instance_ids,
153170
)
171+
self._call_delete_asg(asg_name)
172+
self._cleanup_on_zero_capacity("asg", asg_name)
154173
return
155174

156175
# Detach instances (API limit: 50 per call; use 20 for safety)

0 commit comments

Comments
 (0)