Skip to content

Commit eb0de06

Browse files
authored
feat: Add phase_status tracking and PARTIAL apply status (#1690)
* feat: Add phase_status tracking and PARTIAL apply status Add per-phase execution status tracking to manifest versions for partial failure recovery. When a manifest apply fails mid-execution, the system now records which phases completed, which failed, and which were skipped. Changes: - Migration: Add phase_status JSONB column to manifest_versions - Migration: Add PARTIAL to apply_status CHECK constraint (separate file for CockroachDB compatibility) - Domain: PhaseStatusMap, PhaseStatusEntry types with JSON serialization - Repository: UpdatePhaseStatus method, GetPhaseStatus/SetPhaseStatus on VersionEntity - Proto: PhaseStatusDetail message, phase_status map on ManifestVersion and ApplyManifestResponse, APPLY_STATUS_PARTIAL and APPLY_MANIFEST_STATUS_PARTIAL enum values - gRPC handler: Derives phase status from execution plan phases and saga step results, classifies partial vs complete failure - Tests: Unit tests for classification, phase status building, and round-trip serialization; integration tests for DB persistence * fix: Address review feedback on phase_status implementation - Fix findFailedPhase to use positional correlation instead of name matching. Saga step results use handler names (e.g. "reference_data.register_instrument") while execution plan calls use gRPC method paths. Positional index is the correct mapping. - Fix EntityToProto to propagate phase_status deserialization errors instead of silently dropping them. - Rewrite StoreManifestVersionWithPhaseStatus to include phase_status in the initial Store call (single atomic write) instead of a separate UpdatePhaseStatus call that could fail silently. - Remove misleading "partial" from executor Status field comment since partial classification happens in the gRPC handler, not the executor. * fix: Split CHECK constraint migration for CockroachDB compatibility CockroachDB cannot drop and recreate a constraint with the same name in a single transaction. Split the apply_status constraint change into two migrations: one to drop the old constraint, one to add the new. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 8d4aef2 commit eb0de06

11 files changed

Lines changed: 677 additions & 13 deletions

api/proto/meridian/control_plane/v1/apply_manifest_service.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ message ApplyManifestResponse {
7575
// sequence_number is the sequence number assigned to this manifest version.
7676
// Callers should use this value as expected_sequence_number in subsequent applies.
7777
int64 sequence_number = 7;
78+
79+
// phase_status contains per-phase execution status for partial failure recovery.
80+
// Populated when status is PARTIAL or FAILED, showing which phases succeeded/failed.
81+
map<string, PhaseStatusDetail> phase_status = 8;
7882
}
7983

8084
// ApplyManifestStatus represents the outcome of an apply operation.
@@ -96,6 +100,9 @@ enum ApplyManifestStatus {
96100

97101
// APPLY_MANIFEST_STATUS_BLOCKED indicates the operation was blocked by safety checks.
98102
APPLY_MANIFEST_STATUS_BLOCKED = 5;
103+
104+
// APPLY_MANIFEST_STATUS_PARTIAL indicates some phases succeeded but at least one failed.
105+
APPLY_MANIFEST_STATUS_PARTIAL = 6;
99106
}
100107

101108
// StepResult represents the outcome of a single execution step.

api/proto/meridian/control_plane/v1/manifest_history_service.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@ enum ApplyStatus {
116116

117117
// APPLY_STATUS_ROLLED_BACK indicates this version was rolled back to a previous state.
118118
APPLY_STATUS_ROLLED_BACK = 3;
119+
120+
// APPLY_STATUS_PARTIAL indicates some phases succeeded but at least one failed.
121+
APPLY_STATUS_PARTIAL = 4;
122+
}
123+
124+
// PhaseStatusDetail records the execution result for a single phase.
125+
message PhaseStatusDetail {
126+
// status is the phase execution outcome (e.g., "COMPLETED", "FAILED", "SKIPPED").
127+
string status = 1;
128+
129+
// started_at is when this phase began executing.
130+
optional google.protobuf.Timestamp started_at = 2;
131+
132+
// completed_at is when this phase finished executing.
133+
optional google.protobuf.Timestamp completed_at = 3;
134+
135+
// error contains the error message if the phase failed.
136+
string error = 4;
119137
}
120138

121139
// ManifestVersion represents a stored manifest snapshot with audit metadata.
@@ -164,6 +182,10 @@ message ManifestVersion {
164182

165183
// resource_path is the file path or URL of the manifest source.
166184
optional string resource_path = 14;
185+
186+
// phase_status contains per-phase execution status for partial failure recovery.
187+
// Keys are phase identifiers (e.g., "phase_1", "phase_2").
188+
map<string, PhaseStatusDetail> phase_status = 15;
167189
}
168190

169191
// DiffManifestVersionsRequest compares two manifest versions by sequence number.

services/control-plane/internal/applier/grpc_handler.go

Lines changed: 212 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"log/slog"
10+
"time"
1011

1112
"github.com/google/uuid"
1213
controlplanev1 "github.com/meridianhub/meridian/api/proto/meridian/control_plane/v1"
@@ -17,6 +18,7 @@ import (
1718
"github.com/meridianhub/meridian/shared/platform/tenant"
1819
"google.golang.org/grpc/codes"
1920
"google.golang.org/grpc/status"
21+
"google.golang.org/protobuf/types/known/timestamppb"
2022
)
2123

2224
// Sentinel errors for handler configuration validation.
@@ -178,10 +180,14 @@ func (h *ApplyManifestHandler) ApplyManifest(
178180

179181
if execResult.err != nil {
180182
logger.Error("execution failed", "error", execResult.err)
181-
response.Status = controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_FAILED
182183

183-
// Record failed apply in history (no graph for failed applies, no optimistic lock)
184-
_, _ = h.recordHistory(ctx, req.GetManifest(), req.GetAppliedBy(), execResult.jobID, manifest.ApplyStatusFailed, nil, 0)
184+
// Determine if this is a partial failure (some phases succeeded)
185+
applyStatus, responseStatus := classifyFailure(execResult.phaseStatus)
186+
response.Status = responseStatus
187+
response.PhaseStatus = phaseStatusMapToResponseProto(execResult.phaseStatus)
188+
189+
// Record history with phase status
190+
_, _ = h.recordHistoryWithPhaseStatus(ctx, req.GetManifest(), req.GetAppliedBy(), execResult.jobID, applyStatus, nil, 0, execResult.phaseStatus)
185191
return response, nil //nolint:nilerr // error conveyed via response status, not gRPC error
186192
}
187193

@@ -207,6 +213,7 @@ func (h *ApplyManifestHandler) ApplyManifest(
207213
}
208214

209215
response.Status = controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_APPLIED
216+
response.PhaseStatus = phaseStatusMapToResponseProto(execResult.phaseStatus)
210217
logger.Info("manifest applied successfully", "job_id", execResult.jobID)
211218

212219
// Invoke post-apply hooks (e.g., cache invalidation)
@@ -412,9 +419,10 @@ func (h *ApplyManifestHandler) plan(
412419

413420
// executeOutput holds the results of manifest execution.
414421
type executeOutput struct {
415-
jobID string
416-
stepResult *controlplanev1.StepResult
417-
err error
422+
jobID string
423+
stepResult *controlplanev1.StepResult
424+
err error
425+
phaseStatus manifest.PhaseStatusMap
418426
}
419427

420428
// execute runs the manifest apply via the executor.
@@ -424,8 +432,6 @@ func (h *ApplyManifestHandler) execute(
424432
execPlan *planner.ExecutionPlan,
425433
) executeOutput {
426434
if h.executor == nil {
427-
// No executor configured - reject non-dry-run applies to prevent
428-
// acknowledging applies without actually executing them.
429435
return executeOutput{
430436
err: ErrExecutorNotConfigured,
431437
stepResult: &controlplanev1.StepResult{
@@ -440,15 +446,23 @@ func (h *ApplyManifestHandler) execute(
440446
input := buildExecutorInput(req.GetManifest())
441447
input.TenantID = execPlan.TenantID
442448

449+
// Derive phase status from execution plan phases
450+
phaseStatus := buildInitialPhaseStatus(execPlan)
451+
443452
result, err := h.executor.Apply(ctx, input)
453+
454+
// Update phase status based on execution outcome
455+
updatePhaseStatus(phaseStatus, execPlan, result, err)
456+
444457
if err != nil {
445458
jobID := ""
446459
if result != nil {
447460
jobID = result.JobID.String()
448461
}
449462
return executeOutput{
450-
jobID: jobID,
451-
err: err,
463+
jobID: jobID,
464+
err: err,
465+
phaseStatus: phaseStatus,
452466
stepResult: &controlplanev1.StepResult{
453467
StepName: "execute",
454468
Status: controlplanev1.StepResultStatus_STEP_RESULT_STATUS_FAILED,
@@ -458,7 +472,8 @@ func (h *ApplyManifestHandler) execute(
458472
}
459473

460474
return executeOutput{
461-
jobID: result.JobID.String(),
475+
jobID: result.JobID.String(),
476+
phaseStatus: phaseStatus,
462477
stepResult: &controlplanev1.StepResult{
463478
StepName: "execute",
464479
Status: controlplanev1.StepResultStatus_STEP_RESULT_STATUS_SUCCESS,
@@ -470,6 +485,107 @@ func (h *ApplyManifestHandler) execute(
470485
}
471486
}
472487

488+
// buildInitialPhaseStatus creates a PhaseStatusMap with PENDING entries for each
489+
// phase present in the execution plan.
490+
func buildInitialPhaseStatus(plan *planner.ExecutionPlan) manifest.PhaseStatusMap {
491+
phases := plan.Phases()
492+
m := make(manifest.PhaseStatusMap, len(phases))
493+
for _, p := range phases {
494+
key := fmt.Sprintf("phase_%d", p)
495+
m[key] = manifest.PhaseStatusEntry{
496+
Status: manifest.PhaseStatusPending,
497+
}
498+
}
499+
return m
500+
}
501+
502+
// updatePhaseStatus updates phase entries based on the execution result.
503+
// On success, all phases are marked COMPLETED.
504+
// On failure, phases are marked based on step results: completed phases get COMPLETED,
505+
// the phase containing the failing step gets FAILED, and remaining phases get SKIPPED.
506+
func updatePhaseStatus(
507+
phaseStatus manifest.PhaseStatusMap,
508+
plan *planner.ExecutionPlan,
509+
result *ApplyManifestResult,
510+
execErr error,
511+
) {
512+
now := time.Now().UTC()
513+
phases := plan.Phases()
514+
515+
if execErr == nil && result != nil {
516+
// All phases completed successfully
517+
for _, p := range phases {
518+
key := fmt.Sprintf("phase_%d", p)
519+
entry := phaseStatus[key]
520+
entry.Status = manifest.PhaseStatusCompleted
521+
entry.StartedAt = &now
522+
entry.CompletedAt = &now
523+
phaseStatus[key] = entry
524+
}
525+
return
526+
}
527+
528+
// On failure: mark phases based on step results.
529+
// Steps are executed in phase order. We mark phases as COMPLETED until we
530+
// find a failed step, then the containing phase is FAILED and the rest are SKIPPED.
531+
failedPhase := findFailedPhase(plan, result)
532+
533+
for _, p := range phases {
534+
key := fmt.Sprintf("phase_%d", p)
535+
entry := phaseStatus[key]
536+
entry.StartedAt = &now
537+
538+
if failedPhase > 0 && p < failedPhase {
539+
entry.Status = manifest.PhaseStatusCompleted
540+
entry.CompletedAt = &now
541+
} else if failedPhase > 0 && p == failedPhase {
542+
entry.Status = manifest.PhaseStatusFailed
543+
entry.CompletedAt = &now
544+
if result != nil {
545+
entry.Error = result.Error
546+
} else if execErr != nil {
547+
entry.Error = execErr.Error()
548+
}
549+
} else if failedPhase > 0 {
550+
entry.Status = manifest.PhaseStatusSkipped
551+
entry.StartedAt = nil
552+
} else {
553+
// No phase-level info available; mark all as failed
554+
entry.Status = manifest.PhaseStatusFailed
555+
entry.CompletedAt = &now
556+
if execErr != nil {
557+
entry.Error = execErr.Error()
558+
}
559+
}
560+
phaseStatus[key] = entry
561+
}
562+
}
563+
564+
// findFailedPhase returns the phase number of the first failed step, or 0 if
565+
// it cannot be determined from step results.
566+
//
567+
// Step results are ordered by execution sequence, matching the call order in
568+
// the execution plan. We use positional correlation: step result at index i
569+
// corresponds to planned call at index i. This avoids the naming mismatch
570+
// between saga handler names (e.g. "reference_data.register_instrument") and
571+
// gRPC method paths in the execution plan.
572+
func findFailedPhase(plan *planner.ExecutionPlan, result *ApplyManifestResult) planner.Phase {
573+
if result == nil || len(result.StepResults) == 0 {
574+
return 0
575+
}
576+
577+
for i, step := range result.StepResults {
578+
if !step.Success {
579+
if i < len(plan.Calls) {
580+
return plan.Calls[i].Phase
581+
}
582+
return 0
583+
}
584+
}
585+
586+
return 0
587+
}
588+
473589
// recordHistory stores the manifest version in the history service.
474590
// expectedSeq is passed through for atomic optimistic locking; 0 skips the check.
475591
// Returns (nil, nil) if historyService is not configured.
@@ -513,6 +629,91 @@ func (h *ApplyManifestHandler) recordHistory(
513629
return proto, nil
514630
}
515631

632+
// classifyFailure examines phase statuses to determine if this is a partial
633+
// or complete failure. Returns both the internal ApplyStatus and proto status.
634+
func classifyFailure(ps manifest.PhaseStatusMap) (manifest.ApplyStatus, controlplanev1.ApplyManifestStatus) {
635+
if ps == nil {
636+
return manifest.ApplyStatusFailed, controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_FAILED
637+
}
638+
hasCompleted := false
639+
hasFailed := false
640+
for _, entry := range ps {
641+
switch entry.Status { //nolint:exhaustive // only COMPLETED/FAILED affect classification
642+
case manifest.PhaseStatusCompleted:
643+
hasCompleted = true
644+
case manifest.PhaseStatusFailed:
645+
hasFailed = true
646+
}
647+
}
648+
if hasCompleted && hasFailed {
649+
return manifest.ApplyStatusPartial, controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_PARTIAL
650+
}
651+
return manifest.ApplyStatusFailed, controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_FAILED
652+
}
653+
654+
// phaseStatusMapToResponseProto converts a PhaseStatusMap to proto map for the response.
655+
func phaseStatusMapToResponseProto(ps manifest.PhaseStatusMap) map[string]*controlplanev1.PhaseStatusDetail {
656+
if ps == nil {
657+
return nil
658+
}
659+
result := make(map[string]*controlplanev1.PhaseStatusDetail, len(ps))
660+
for key, entry := range ps {
661+
detail := &controlplanev1.PhaseStatusDetail{
662+
Status: string(entry.Status),
663+
Error: entry.Error,
664+
}
665+
if entry.StartedAt != nil {
666+
detail.StartedAt = timestamppb.New(*entry.StartedAt)
667+
}
668+
if entry.CompletedAt != nil {
669+
detail.CompletedAt = timestamppb.New(*entry.CompletedAt)
670+
}
671+
result[key] = detail
672+
}
673+
return result
674+
}
675+
676+
// recordHistoryWithPhaseStatus stores the manifest version in history with phase status.
677+
func (h *ApplyManifestHandler) recordHistoryWithPhaseStatus(
678+
ctx context.Context,
679+
mf *controlplanev1.Manifest,
680+
appliedBy string,
681+
jobID string,
682+
applyStatus manifest.ApplyStatus,
683+
graph *validator.RelationshipGraph,
684+
expectedSeq int64,
685+
phaseStatus manifest.PhaseStatusMap,
686+
) (*controlplanev1.ManifestVersion, error) {
687+
if h.historyService == nil {
688+
return nil, nil //nolint:nilnil // history recording is optional
689+
}
690+
691+
var jobUUID *uuid.UUID
692+
if jobID != "" {
693+
parsed, err := uuid.Parse(jobID)
694+
if err == nil {
695+
jobUUID = &parsed
696+
}
697+
}
698+
699+
entity, err := h.historyService.StoreManifestVersionWithPhaseStatus(ctx, mf, appliedBy, jobUUID, applyStatus, graph, expectedSeq, phaseStatus)
700+
if err != nil {
701+
if errors.Is(err, manifest.ErrSequenceConflict) {
702+
return nil, err
703+
}
704+
h.logger.Error("failed to record manifest history with phase status", "error", err)
705+
return nil, nil //nolint:nilnil // non-conflict errors are logged but non-fatal
706+
}
707+
708+
proto, err := manifest.EntityToProto(entity)
709+
if err != nil {
710+
h.logger.Error("failed to convert history entity to proto", "error", err)
711+
return nil, nil //nolint:nilnil // conversion errors are logged but non-fatal
712+
}
713+
714+
return proto, nil
715+
}
716+
516717
// buildExecutorInput converts a Manifest proto into the ApplyManifestInput
517718
// consumed by the saga-based ManifestExecutor.
518719
func buildExecutorInput(mf *controlplanev1.Manifest) *ApplyManifestInput {

0 commit comments

Comments
 (0)