Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/6330.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent artifact deletion when download and archive storage are identical

Copilot AI Oct 23, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue number in the filename (6330) doesn't match the issue referenced in the PR title and description (6329/BA-2764). This appears to be a typo in the changelog filename.

Copilot uses AI. Check for mistakes.
16 changes: 0 additions & 16 deletions src/ai/backend/common/dto/storage/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,6 @@ class HuggingFaceImportModelsReq(BaseRequestModel):
""",
examples=["huggingface", "my-huggingface"],
)
storage_name: str = Field(
description="""
Target storage name where all models will be imported.
Must be a configured and accessible storage backend.
Used as fallback when storage_step_mappings is not provided.
""",
examples=["default-minio", "s3-storage", "local-storage"],
)
storage_step_mappings: dict[ArtifactStorageImportStep, str] = Field(
description="""
Optional mapping of import steps to specific storage backends.
Expand Down Expand Up @@ -260,14 +252,6 @@ class ReservoirImportModelsReq(BaseRequestModel):
""",
examples=["reservoir", "my-reservoir"],
)
storage_name: str = Field(
description="""
Target storage name where all models will be imported.
Must be a configured and accessible storage backend.
Used as fallback when storage_step_mappings is not provided.
""",
examples=["default-minio", "s3-storage", "local-storage"],
)
storage_step_mappings: dict[ArtifactStorageImportStep, str] = Field(
description="""
Optional mapping of import steps to specific storage backends.
Expand Down
3 changes: 3 additions & 0 deletions src/ai/backend/manager/api/gql/artifact_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ArtifactType,
)
from ai.backend.manager.errors.api import NotImplementedAPI
from ai.backend.manager.errors.common import ServerMisconfiguredError
from ai.backend.manager.services.artifact_registry.actions.common.get_meta import (
GetArtifactRegistryMetaAction,
)
Expand All @@ -29,6 +30,8 @@ async def default_artifact_registry(
artifact_type: ArtifactType, info: Info[StrawberryGQLContext]
) -> Optional[ArtifactRegistry]:
artifact_registry_cfg = info.context.config_provider.config.artifact_registry
if artifact_registry_cfg is None:
raise ServerMisconfiguredError("Artifact registry configuration is missing.")

registry_name: Optional[str] = None
match artifact_type:
Expand Down
67 changes: 29 additions & 38 deletions src/ai/backend/manager/config/unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@
Field,
FilePath,
IPvAnyNetwork,
ValidationInfo,
field_serializer,
field_validator,
)
Expand Down Expand Up @@ -1816,10 +1815,12 @@ class ReservoirConfig(BaseConfigSchema):
serialization_alias="use-delegation",
)
storage_name: str = Field(
default="RESERVOIR_STORAGE_NAME",
description="""
Name of the reservoir storage configuration.
Used to identify this storage in the system.
Name of the reservoir default storage.

You can specify the storage to be used for each step using storage_step_selection.
For steps not explicitly specified in storage_step_selection, the storage is designated by storage_name.
If you specify storage for all steps in storage_step_selection, there is no need to specify storage_name.
""",
examples=["minio-storage", "gitlfs-storage", "vfs-storage"],
validation_alias=AliasChoices("storage-name", "storage_name"),
Expand All @@ -1843,43 +1844,33 @@ class ReservoirConfig(BaseConfigSchema):
serialization_alias="storage-step-selection",
)

@field_validator("storage_step_selection", mode="before")
@classmethod
def _validate_required_steps(
cls, v: dict[str, str], info: ValidationInfo
) -> dict[ArtifactStorageImportStep, str]:
_REQUIRED_STEPS = {ArtifactStorageImportStep.DOWNLOAD, ArtifactStorageImportStep.ARCHIVE}

# Get storage_name from the current model data being validated
default_storage_name = info.data.get("storage_name", "RESERVOIR_STORAGE_NAME")

if not v: # If storage_step_selection is empty or None
return {step: default_storage_name for step in _REQUIRED_STEPS}
def resolve_storage_step_selection(self) -> dict[ArtifactStorageImportStep, str]:
"""
Resolves the actual `storage_step_selection` to be passed to the storage proxy based on `storage_step_selection` and `storage_name`
"""

# Convert string keys to ArtifactStorageImportStep enum keys if needed
converted_dict = {}
for key, value in v.items():
try:
enum_key = ArtifactStorageImportStep(key)
converted_dict[enum_key] = value
except ValueError:
# Skip invalid step names
log.warning(f"Invalid artifact storage step key: {key}, skipping...")
continue
_REQUIRED_STEPS = {ArtifactStorageImportStep.DOWNLOAD, ArtifactStorageImportStep.ARCHIVE}

# Check for required steps
missing_steps = _REQUIRED_STEPS - set(converted_dict.keys())
resolved_selection: dict[ArtifactStorageImportStep, str] = (
self.storage_step_selection.copy()
)
for required_step in _REQUIRED_STEPS:
if required_step not in resolved_selection:
resolved_selection[required_step] = self.storage_name

# Add missing steps with default storage name
for step in missing_steps:
converted_dict[step] = default_storage_name
return resolved_selection

return converted_dict
@property
def archive_storage(self) -> str:
"""
Resolve the storage backend for the `ARCHIVE` step.
If not explicitly specified, falls back to `storage_name`.
"""
return self.storage_step_selection.get(ArtifactStorageImportStep.ARCHIVE, self.storage_name)


class ModelRegistryConfig(BaseConfigSchema):
class ArtifactRegistryConfig(BaseConfigSchema):
model_registry: str = Field(
default="MODEL_REGISTRY_NAME",
description="""
Name of the Model registry configuration.
Used to identify this registry in the system.
Expand Down Expand Up @@ -2064,16 +2055,16 @@ class ManagerUnifiedConfig(BaseConfigSchema):
Controls how services are discovered and connected within the Backend.AI system.
""",
)
artifact_registry: ModelRegistryConfig = Field(
default_factory=ModelRegistryConfig,
artifact_registry: Optional[ArtifactRegistryConfig] = Field(
default=None,
description="""
Default artifact registry config.
""",
validation_alias=AliasChoices("artifact_registry", "artifact-registry"),
serialization_alias="artifact-registry",
)
reservoir: ReservoirConfig = Field(
default_factory=ReservoirConfig,
reservoir: Optional[ReservoirConfig] = Field(
default=None,
description="""
Reservoir configuration.
""",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ai.backend.manager.config.provider import ManagerConfigProvider
from ai.backend.manager.data.artifact.types import ArtifactStatus
from ai.backend.manager.errors.artifact_registry import InvalidArtifactRegistryTypeError
from ai.backend.manager.errors.common import ServerMisconfiguredError
from ai.backend.manager.repositories.artifact.repository import ArtifactRepository
from ai.backend.manager.repositories.huggingface_registry.repository import HuggingFaceRepository
from ai.backend.manager.repositories.reservoir_registry.repository import (
Expand Down Expand Up @@ -84,7 +85,10 @@ async def handle_model_import_done(
return

try:
if self._config_provider.config.reservoir.enable_approve_process:
reservoir_config = self._config_provider.config.reservoir
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing.")
if reservoir_config.enable_approve_process:
await self._artifact_repository.update_artifact_revision_status(
revision.id, ArtifactStatus.NEEDS_APPROVAL
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ai.backend.manager.data.object_storage.types import ObjectStorageData
from ai.backend.manager.data.vfs_storage.types import VFSStorageData
from ai.backend.manager.errors.artifact_registry import ReservoirConnectionError
from ai.backend.manager.errors.common import ServerMisconfiguredError
from ai.backend.manager.repositories.artifact.repository import ArtifactRepository
from ai.backend.manager.repositories.artifact.types import (
ArtifactRemoteStatusFilter,
Expand Down Expand Up @@ -110,7 +111,11 @@
"""

reservoir_config = self._config_provider.config.reservoir
storage = await self._resolve_storage_data(reservoir_config.storage_name)
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing")

storage = await self._resolve_storage_data(reservoir_config.archive_storage)
# TODO: Current structure does not cover cases where different storages have different hosts.

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
storage_proxy_client = self._storage_manager.get_manager_facing_client(storage.host)

# Get all reservoir registries
Expand Down Expand Up @@ -189,8 +194,7 @@
import_req = ReservoirImportModelsReq(
models=models,
registry_name=reservoir_registry_data.name,
storage_name=reservoir_config.storage_name,
storage_step_mappings=reservoir_config.storage_step_selection,
storage_step_mappings=reservoir_config.resolve_storage_step_selection(),
)

# Call storage proxy import API
Expand Down
7 changes: 6 additions & 1 deletion src/ai/backend/manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
from ai.backend.manager.config.loader.types import AbstractConfigLoader
from ai.backend.manager.config.provider import ManagerConfigProvider
from ai.backend.manager.config.watchers.etcd import EtcdConfigWatcher
from ai.backend.manager.errors.common import ServerMisconfiguredError
from ai.backend.manager.sokovan.deployment.deployment_controller import (
DeploymentController,
DeploymentControllerArgs,
Expand Down Expand Up @@ -1065,7 +1066,11 @@ async def leader_election_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
task_specs = root_ctx.sokovan_orchestrator.create_task_specs()

# Rescan reservoir registry periodically
if root_ctx.config_provider.config.reservoir.use_delegation:
reservoir_config = root_ctx.config_provider.config.reservoir
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing.")

if reservoir_config.use_delegation:
task_specs.append(
EventTaskSpec(
name="reservoir_registry_scan",
Expand Down
31 changes: 23 additions & 8 deletions src/ai/backend/manager/services/artifact/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
RemoteReservoirScanError,
ReservoirConnectionError,
)
from ai.backend.manager.errors.common import ServerMisconfiguredError
from ai.backend.manager.repositories.artifact.repository import ArtifactRepository
from ai.backend.manager.repositories.artifact_registry.repository import ArtifactRegistryRepository
from ai.backend.manager.repositories.huggingface_registry.repository import HuggingFaceRepository
Expand Down Expand Up @@ -147,9 +148,11 @@ async def _get_storage_client(self, storage_name: str):

async def scan(self, action: ScanArtifactsAction) -> ScanArtifactsActionResult:
reservoir_config = self._config_provider.config.reservoir
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing")

# TODO: Abstract remote registry client layer (scan, import)
storage_proxy_client = await self._get_storage_client(reservoir_config.storage_name)
storage_proxy_client = await self._get_storage_client(reservoir_config.archive_storage)

registry_meta = await self._resolve_artifact_registry_meta(
action.artifact_type, action.registry_id
Expand Down Expand Up @@ -327,9 +330,11 @@ async def scan_sync(self, action: ScanArtifactsSyncAction) -> ScanArtifactsSyncA
This action scans and returns all metadata, including readme, size, and other information
"""
reservoir_config = self._config_provider.config.reservoir
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing")

# TODO: Abstract remote registry client layer (scan, import)
storage_proxy_client = await self._get_storage_client(reservoir_config.storage_name)
storage_proxy_client = await self._get_storage_client(reservoir_config.archive_storage)

registry_meta = await self._resolve_artifact_registry_meta(
action.artifact_type, action.registry_id
Expand Down Expand Up @@ -572,10 +577,11 @@ async def retrieve_models(self, action: RetrieveModelsAction) -> RetrieveModelsA
raise NotImplementedError("Only HuggingFace registry is supported for model retrieval")

reservoir_config = self._config_provider.config.reservoir
storage = await self._object_storage_repository.get_by_name(reservoir_config.storage_name)
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing")

# TODO: Abstract remote registry client layer (scan, import)
storage_proxy_client = self._storage_manager.get_manager_facing_client(storage.host)
storage_proxy_client = await self._get_storage_client(reservoir_config.archive_storage)

registry_data = await self._huggingface_registry_repository.get_registry_data_by_id(
registry_id
Expand Down Expand Up @@ -604,10 +610,11 @@ async def retrieve_single_model(self, action: RetrieveModelAction) -> RetrieveMo
raise NotImplementedError("Only HuggingFace registry is supported for model retrieval")

reservoir_config = self._config_provider.config.reservoir
storage = await self._object_storage_repository.get_by_name(reservoir_config.storage_name)
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing")

# TODO: Abstract remote registry client layer (scan, import)
storage_proxy_client = self._storage_manager.get_manager_facing_client(storage.host)
storage_proxy_client = await self._get_storage_client(reservoir_config.archive_storage)

registry_data = await self._huggingface_registry_repository.get_registry_data_by_id(
registry_id
Expand Down Expand Up @@ -644,7 +651,11 @@ async def _resolve_artifact_registry_meta(
) -> ArtifactRegistryData:
if registry_id_or_none is None:
# TODO: Handle `artifact_type` for other types
registry_name = self._config_provider.config.artifact_registry.model_registry
artifact_registry_cfg = self._config_provider.config.artifact_registry
if artifact_registry_cfg is None:
raise ServerMisconfiguredError("Artifact registry configuration is missing.")

registry_name = artifact_registry_cfg.model_registry
registry_meta = (
await self._artifact_registry_repository.get_artifact_registry_data_by_name(
registry_name
Expand All @@ -662,7 +673,11 @@ async def delegate_scan_artifacts(
self, action: DelegateScanArtifactsAction
) -> DelegateScanArtifactsActionResult:
# If this is a leaf node, perform local scan instead of delegation
if not self._config_provider.config.reservoir.use_delegation:
reservoir_config = self._config_provider.config.reservoir
if reservoir_config is None:
raise ServerMisconfiguredError("Reservoir configuration is missing")

if not reservoir_config.use_delegation:
registry_id = None
if action.delegatee_target:
registry_id = action.delegatee_target.target_registry_id
Expand Down
Loading
Loading