Skip to content

Commit c05370d

Browse files
fregataaclaude
andcommitted
refactor(BA-5777): simplify bulk processor by inlining validator call and improving names
Inline _run_validator() into the loop, rename _process_action to _filter_by_validation, and rename current_action to filtered_action. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3494137 commit c05370d

1 file changed

Lines changed: 8 additions & 34 deletions

File tree

  • src/ai/backend/manager/actions/processor

src/ai/backend/manager/actions/processor/bulk.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -69,32 +69,9 @@ def __init__(
6969

7070
self._validators = validators or []
7171

72-
async def _run_validator(
72+
def _filter_by_validation(
7373
self,
74-
validator: BulkActionValidator,
7574
action: TBulkAction,
76-
meta: BaseActionTriggerMeta,
77-
) -> BulkValidationResult:
78-
"""Invoke one validator and emit its timing/trace log.
79-
80-
Timing and per-validator logging live here rather than inside each
81-
validator implementation so the cross-cutting concern has one home.
82-
"""
83-
started_at = datetime.now(UTC)
84-
validation = await validator.validate(action, meta)
85-
duration = (datetime.now(UTC) - started_at).total_seconds()
86-
log.debug(
87-
"bulk validator {} saw {} ids, denied {} in {:.3f}s",
88-
validator.name(),
89-
len(action.entity_ids),
90-
len(validation.denied_entities),
91-
duration,
92-
)
93-
return validation
94-
95-
def _process_action(
96-
self,
97-
current_action: TBulkAction,
9875
validation: BulkValidationResult,
9976
) -> TBulkAction:
10077
"""Return a new action narrowed to the IDs this validator permitted.
@@ -105,33 +82,30 @@ def _process_action(
10582
immutable.
10683
"""
10784
if not validation.denied_entities:
108-
return current_action
85+
return action
10986
allowed_set = set(validation.allowed_entity_ids)
110-
filtered_ids = [eid for eid in current_action.entity_ids if eid in allowed_set]
111-
return type(current_action)(entity_ids=filtered_ids)
87+
filtered_ids = [eid for eid in action.entity_ids if eid in allowed_set]
88+
return type(action)(entity_ids=filtered_ids)
11289

11390
async def _run(self, action: TBulkAction) -> BulkProcessResult[TBulkActionResult]:
11491
started_at = datetime.now(UTC)
11592
action_id = uuid.uuid4()
11693
action_trigger_meta = BaseActionTriggerMeta(action_id=action_id, started_at=started_at)
11794

118-
# Run every validator over the surviving ID set, then invoke the
119-
# service function once on the final narrowed action — the service
120-
# must only see IDs that passed every validator.
121-
current_action: TBulkAction = action
95+
filtered_action: TBulkAction = action
12296
decisions: list[ValidatorDecision] = []
12397

12498
for validator in self._validators:
125-
validation = await self._run_validator(validator, current_action, action_trigger_meta)
99+
validation = await validator.validate(filtered_action, action_trigger_meta)
126100
decisions.append(
127101
ValidatorDecision(
128102
validator_name=validator.name(),
129103
results=validation,
130104
)
131105
)
132-
current_action = self._process_action(current_action, validation)
106+
filtered_action = self._filter_by_validation(filtered_action, validation)
133107

134-
action_result = await self._runner.run(current_action, action_trigger_meta)
108+
action_result = await self._runner.run(filtered_action, action_trigger_meta)
135109
return BulkProcessResult(result=action_result, validator_decisions=decisions)
136110

137111
async def wait_for_complete(self, action: TBulkAction) -> BulkProcessResult[TBulkActionResult]:

0 commit comments

Comments
 (0)