Skip to content

Commit ccacdeb

Browse files
fregataaclaude
andcommitted
refactor(BA-5777): address PR review feedback on bulk processor
- Rename BulkValidatorDecision to ValidatorDecision; the dataclass records one validator's verdict, so the Bulk prefix implied a set and conflicted with the surrounding Bulk* processor/result types - Replace the _validator_scope asynccontextmanager with a plain async helper _run_validator; the CM yielded once with no pre/post split, so the bookend ceremony bought nothing over a regular call - Drop the unused name parameter and _name attribute from the test's _AllowSetValidator (name() classmethod hardcodes the identifier) - Note in _run that the service function runs once on the post-filter action so only entities that passed every validator reach it Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cd66bef commit ccacdeb

3 files changed

Lines changed: 31 additions & 36 deletions

File tree

src/ai/backend/manager/actions/processor/bulk.py

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
import uuid
3-
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
4-
from contextlib import asynccontextmanager
3+
from collections.abc import Awaitable, Callable, Sequence
54
from dataclasses import dataclass
65
from datetime import UTC, datetime
76
from typing import Any
@@ -26,7 +25,7 @@
2625

2726

2827
@dataclass(frozen=True)
29-
class BulkValidatorDecision:
28+
class ValidatorDecision:
3029
"""One validator's per-entity verdict observed during bulk processing.
3130
3231
Mirrors the ``SubStepResult`` pattern used by the scheduler history so
@@ -49,7 +48,7 @@ class BulkProcessResult[TBulkActionResult: BaseBulkActionResult]:
4948
"""
5049

5150
result: TBulkActionResult
52-
validator_decisions: list[BulkValidatorDecision]
51+
validator_decisions: list[ValidatorDecision]
5352

5453

5554
class BulkActionProcessor[
@@ -70,32 +69,28 @@ def __init__(
7069

7170
self._validators = validators or []
7271

73-
@asynccontextmanager
74-
async def _validator_scope(
72+
async def _run_validator(
7573
self,
7674
validator: BulkActionValidator,
7775
action: TBulkAction,
7876
meta: BaseActionTriggerMeta,
79-
) -> AsyncIterator[BulkValidationResult]:
80-
"""Run one validator inside a bookend scope.
77+
) -> BulkValidationResult:
78+
"""Invoke one validator and emit its timing/trace log.
8179
82-
Yields the validator's ``BulkValidationResult`` so the caller can
83-
record the decision inside the block. Timing and per-validator
84-
logging live here rather than inside each validator implementation.
80+
Timing and per-validator logging live here rather than inside each
81+
validator implementation so the cross-cutting concern has one home.
8582
"""
8683
started_at = datetime.now(UTC)
8784
validation = await validator.validate(action, meta)
88-
try:
89-
yield validation
90-
finally:
91-
duration = (datetime.now(UTC) - started_at).total_seconds()
92-
log.debug(
93-
"bulk validator {} saw {} ids, denied {} in {:.3f}s",
94-
validator.name(),
95-
len(validation.allowed_entity_ids) + len(validation.denied_entities),
96-
len(validation.denied_entities),
97-
duration,
98-
)
85+
duration = (datetime.now(UTC) - started_at).total_seconds()
86+
log.debug(
87+
"bulk validator {} saw {} ids, denied {} in {:.3f}s",
88+
validator.name(),
89+
len(action.entity_ids),
90+
len(validation.denied_entities),
91+
duration,
92+
)
93+
return validation
9994

10095
def _process_action(
10196
self,
@@ -120,20 +115,21 @@ async def _run(self, action: TBulkAction) -> BulkProcessResult[TBulkActionResult
120115
action_id = uuid.uuid4()
121116
action_trigger_meta = BaseActionTriggerMeta(action_id=action_id, started_at=started_at)
122117

118+
# Run every validator over the surviving ID set, then invoke the
119+
# service function once on the final narrowed action — the service
120+
# must only see IDs that passed every validator.
123121
current_action: TBulkAction = action
124-
decisions: list[BulkValidatorDecision] = []
122+
decisions: list[ValidatorDecision] = []
125123

126124
for validator in self._validators:
127-
async with self._validator_scope(
128-
validator, current_action, action_trigger_meta
129-
) as validation:
130-
decisions.append(
131-
BulkValidatorDecision(
132-
validator_name=validator.name(),
133-
results=validation,
134-
)
125+
validation = await self._run_validator(validator, current_action, action_trigger_meta)
126+
decisions.append(
127+
ValidatorDecision(
128+
validator_name=validator.name(),
129+
results=validation,
135130
)
136-
current_action = self._process_action(current_action, validation)
131+
)
132+
current_action = self._process_action(current_action, validation)
137133

138134
action_result = await self._runner.run(current_action, action_trigger_meta)
139135
return BulkProcessResult(result=action_result, validator_decisions=decisions)

src/ai/backend/manager/actions/validator/bulk.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class BulkValidationResult:
2020
2121
``BulkActionProcessor`` intersects ``allowed_entity_ids`` across
2222
validators and records each ``DeniedEntity`` — with its reason — on the
23-
corresponding ``BulkValidatorDecision`` so the final response can
23+
corresponding ``ValidatorDecision`` so the final response can
2424
surface *why* each ID was filtered out.
2525
"""
2626

@@ -32,7 +32,7 @@ class BulkActionValidator(ABC):
3232
@classmethod
3333
@abstractmethod
3434
def name(cls) -> str:
35-
"""Stable identifier used in ``BulkValidatorDecision.validator_name``.
35+
"""Stable identifier used in ``ValidatorDecision.validator_name``.
3636
3737
Chosen by the implementation so logs and partial-success responses can
3838
attribute denials to a specific validator independently of the Python

tests/unit/manager/actions/test_bulk_processor.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,8 @@ def entity_ids(self) -> list[str]:
5656
class _AllowSetValidator(BulkActionValidator):
5757
"""Approves any ID in ``allowed``; anything else visible is denied."""
5858

59-
def __init__(self, allowed: set[str], name: str = "allow-set") -> None:
59+
def __init__(self, allowed: set[str]) -> None:
6060
self._allowed = set(allowed)
61-
self._name = name
6261

6362
@classmethod
6463
@override

0 commit comments

Comments
 (0)