Skip to content

Commit 715810a

Browse files
fregataaclaude
authored andcommitted
feat(BA-5777): add bulk RBAC filtering infrastructure (#11191)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6dfd08f commit 715810a

24 files changed

Lines changed: 507 additions & 383 deletions

File tree

changes/11191.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add bulk RBAC filtering infrastructure so `BulkActionProcessor` can narrow actions per-entity and report per-validator decisions.

src/ai/backend/manager/actions/action/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
TAction,
99
TActionResult,
1010
)
11-
from .batch import (
12-
BaseBatchAction,
13-
BaseBatchActionResult,
11+
from .bulk import (
12+
BaseBulkAction,
13+
BaseBulkActionResult,
1414
)
1515
from .rbac import (
1616
BaseRBACAction,
@@ -123,8 +123,8 @@
123123
"BaseActionResult",
124124
"BaseActionResultMeta",
125125
"BaseActionTriggerMeta",
126-
"BaseBatchAction",
127-
"BaseBatchActionResult",
126+
"BaseBulkAction",
127+
"BaseBulkActionResult",
128128
"BaseRBACAction",
129129
"RBACActionName",
130130
"RBACRequiredPermission",

src/ai/backend/manager/actions/action/batch.py

Lines changed: 0 additions & 38 deletions
This file was deleted.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from abc import abstractmethod
2+
from dataclasses import dataclass
3+
from typing import Any, TypeVar, override
4+
5+
from .base import BaseAction, BaseActionResult
6+
7+
8+
@dataclass
9+
class BaseBulkAction[T](BaseAction):
10+
"""Base class for actions operating on a bulk of entities.
11+
12+
``entity_ids`` is stored as ``list[str]`` so ``BulkActionValidator``
13+
implementations can match against validator verdicts directly. The
14+
original ``T``-typed view is exposed via ``typed_entity_ids()``.
15+
16+
Bulk actions intentionally carry **only** ``entity_ids``. User context
17+
(user id, role) flows through ``current_user()``, not the action, so
18+
``BulkActionProcessor`` can reconstruct a filtered action by calling
19+
``type(action)(entity_ids=...)`` directly — no ``__init__`` override or
20+
factory hook is required. Subclasses that try to add required fields
21+
break that constructor call and will fail fast at runtime, which is
22+
intentional.
23+
"""
24+
25+
entity_ids: list[str]
26+
27+
@abstractmethod
28+
def typed_entity_ids(self) -> list[T]:
29+
"""Return ``entity_ids`` converted back to the native ID type ``T``."""
30+
raise NotImplementedError
31+
32+
33+
class BaseBulkActionResult(BaseActionResult):
34+
@override
35+
def entity_id(self) -> str | None:
36+
return None
37+
38+
@abstractmethod
39+
def entity_ids(self) -> list[str]:
40+
raise NotImplementedError
41+
42+
43+
TBulkAction = TypeVar("TBulkAction", bound=BaseBulkAction[Any])
44+
TBulkActionResult = TypeVar("TBulkActionResult", bound=BaseBulkActionResult)

src/ai/backend/manager/actions/processor/batch.py

Lines changed: 0 additions & 50 deletions
This file was deleted.
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import logging
2+
import uuid
3+
from collections.abc import Awaitable, Callable, Sequence
4+
from dataclasses import dataclass
5+
from datetime import UTC, datetime
6+
from typing import Any
7+
8+
from ai.backend.logging.utils import BraceStyleAdapter
9+
from ai.backend.manager.actions.action import (
10+
BaseActionTriggerMeta,
11+
)
12+
from ai.backend.manager.actions.action.bulk import (
13+
BaseBulkAction,
14+
BaseBulkActionResult,
15+
)
16+
from ai.backend.manager.actions.monitors.monitor import ActionMonitor
17+
from ai.backend.manager.actions.validator.bulk import (
18+
BulkActionValidator,
19+
BulkValidationResult,
20+
)
21+
22+
from .base import ActionRunner
23+
24+
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
25+
26+
27+
@dataclass(frozen=True)
28+
class ValidatorDecision:
29+
"""One validator's per-entity verdict observed during bulk processing.
30+
31+
Mirrors the ``SubStepResult`` pattern used by the scheduler history so
32+
callers can trace where in the validator chain each ID was filtered and
33+
*why*. ``results`` carries the validator's classification unchanged.
34+
"""
35+
36+
validator_name: str
37+
results: BulkValidationResult
38+
39+
40+
@dataclass(frozen=True)
41+
class BulkProcessResult[TBulkActionResult: BaseBulkActionResult]:
42+
"""Outcome of a ``BulkActionProcessor`` run.
43+
44+
``result`` is what the service function returned for the permitted subset
45+
of entity IDs. ``validator_decisions`` keeps the per-validator trace in
46+
iteration order; callers assemble the partial-success response by
47+
walking it (each decision carries the denied IDs and their reasons).
48+
"""
49+
50+
result: TBulkActionResult
51+
validator_decisions: list[ValidatorDecision]
52+
53+
54+
class BulkActionProcessor[
55+
TBulkAction: BaseBulkAction[Any],
56+
TBulkActionResult: BaseBulkActionResult,
57+
]:
58+
_validators: Sequence[BulkActionValidator]
59+
60+
_runner: ActionRunner[TBulkAction, TBulkActionResult]
61+
62+
def __init__(
63+
self,
64+
func: Callable[[TBulkAction], Awaitable[TBulkActionResult]],
65+
monitors: Sequence[ActionMonitor] | None = None,
66+
validators: Sequence[BulkActionValidator] | None = None,
67+
) -> None:
68+
self._runner = ActionRunner(func, monitors)
69+
70+
self._validators = validators or []
71+
72+
def _filter_by_validation(
73+
self,
74+
action: TBulkAction,
75+
validation: BulkValidationResult,
76+
) -> TBulkAction:
77+
"""Return a new action narrowed to the IDs this validator permitted.
78+
79+
Returns the incoming action unchanged when the validator denied
80+
nothing; otherwise constructs a fresh instance of the same class
81+
via its ``entity_ids``-only constructor so the original stays
82+
immutable.
83+
"""
84+
if not validation.denied_entities:
85+
return action
86+
allowed_set = set(validation.allowed_entity_ids)
87+
filtered_ids = [eid for eid in action.entity_ids if eid in allowed_set]
88+
return type(action)(entity_ids=filtered_ids)
89+
90+
async def _run(self, action: TBulkAction) -> BulkProcessResult[TBulkActionResult]:
91+
started_at = datetime.now(UTC)
92+
action_id = uuid.uuid4()
93+
action_trigger_meta = BaseActionTriggerMeta(action_id=action_id, started_at=started_at)
94+
95+
filtered_action: TBulkAction = action
96+
decisions: list[ValidatorDecision] = []
97+
98+
for validator in self._validators:
99+
validation = await validator.validate(filtered_action, action_trigger_meta)
100+
decisions.append(
101+
ValidatorDecision(
102+
validator_name=validator.name(),
103+
results=validation,
104+
)
105+
)
106+
filtered_action = self._filter_by_validation(filtered_action, validation)
107+
108+
action_result = await self._runner.run(filtered_action, action_trigger_meta)
109+
return BulkProcessResult(result=action_result, validator_decisions=decisions)
110+
111+
async def wait_for_complete(self, action: TBulkAction) -> BulkProcessResult[TBulkActionResult]:
112+
return await self._run(action)

src/ai/backend/manager/actions/validator/batch.py

Lines changed: 0 additions & 10 deletions
This file was deleted.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from abc import ABC, abstractmethod
2+
from dataclasses import dataclass
3+
from typing import Any
4+
5+
from ai.backend.manager.actions.action import BaseActionTriggerMeta
6+
from ai.backend.manager.actions.action.bulk import BaseBulkAction
7+
8+
9+
@dataclass(frozen=True)
10+
class DeniedEntity:
11+
"""A bulk entity that a validator rejected, paired with its reason."""
12+
13+
entity_id: str
14+
deny_reason: str
15+
16+
17+
@dataclass(frozen=True)
18+
class BulkValidationResult:
19+
"""Per-entity validation outcome for a bulk action.
20+
21+
``BulkActionProcessor`` intersects ``allowed_entity_ids`` across
22+
validators and records each ``DeniedEntity`` — with its reason — on the
23+
corresponding ``ValidatorDecision`` so the final response can
24+
surface *why* each ID was filtered out.
25+
"""
26+
27+
allowed_entity_ids: list[str]
28+
denied_entities: list[DeniedEntity]
29+
30+
31+
class BulkActionValidator(ABC):
32+
@classmethod
33+
@abstractmethod
34+
def name(cls) -> str:
35+
"""Stable identifier used in ``ValidatorDecision.validator_name``.
36+
37+
Chosen by the implementation so logs and partial-success responses can
38+
attribute denials to a specific validator independently of the Python
39+
class name.
40+
"""
41+
raise NotImplementedError
42+
43+
@abstractmethod
44+
async def validate(
45+
self, action: BaseBulkAction[Any], meta: BaseActionTriggerMeta
46+
) -> BulkValidationResult:
47+
"""Validate the bulk action and return per-entity permission results.
48+
49+
Implementations must classify every ID in ``action.entity_ids`` as
50+
either allowed or denied. Validators that cannot make a decision for
51+
an ID should treat it as allowed.
52+
53+
The processor wraps each call in its own async context manager so
54+
cross-cutting concerns (timing, audit) live in one place — validators
55+
do not need to own them.
56+
"""
57+
raise NotImplementedError

src/ai/backend/manager/actions/validators/rbac/batch.py

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)