Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ message ApplyManifestResponse {
// sequence_number is the sequence number assigned to this manifest version.
// Callers should use this value as expected_sequence_number in subsequent applies.
int64 sequence_number = 7;

// phase_status contains per-phase execution status for partial failure recovery.
// Populated when status is PARTIAL or FAILED, showing which phases succeeded/failed.
map<string, PhaseStatusDetail> phase_status = 8;
}

// ApplyManifestStatus represents the outcome of an apply operation.
Expand All @@ -96,6 +100,9 @@ enum ApplyManifestStatus {

// APPLY_MANIFEST_STATUS_BLOCKED indicates the operation was blocked by safety checks.
APPLY_MANIFEST_STATUS_BLOCKED = 5;

// APPLY_MANIFEST_STATUS_PARTIAL indicates some phases succeeded but at least one failed.
APPLY_MANIFEST_STATUS_PARTIAL = 6;
}

// StepResult represents the outcome of a single execution step.
Expand Down
22 changes: 22 additions & 0 deletions api/proto/meridian/control_plane/v1/manifest_history_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ enum ApplyStatus {

// APPLY_STATUS_ROLLED_BACK indicates this version was rolled back to a previous state.
APPLY_STATUS_ROLLED_BACK = 3;

// APPLY_STATUS_PARTIAL indicates some phases succeeded but at least one failed.
APPLY_STATUS_PARTIAL = 4;
}

// PhaseStatusDetail records the execution result for a single phase.
message PhaseStatusDetail {
// status is the phase execution outcome (e.g., "COMPLETED", "FAILED", "SKIPPED").
string status = 1;

// started_at is when this phase began executing.
optional google.protobuf.Timestamp started_at = 2;

// completed_at is when this phase finished executing.
optional google.protobuf.Timestamp completed_at = 3;

// error contains the error message if the phase failed.
string error = 4;
}

// ManifestVersion represents a stored manifest snapshot with audit metadata.
Expand Down Expand Up @@ -164,6 +182,10 @@ message ManifestVersion {

// resource_path is the file path or URL of the manifest source.
optional string resource_path = 14;

// phase_status contains per-phase execution status for partial failure recovery.
// Keys are phase identifiers (e.g., "phase_1", "phase_2").
map<string, PhaseStatusDetail> phase_status = 15;
}

// DiffManifestVersionsRequest compares two manifest versions by sequence number.
Expand Down
223 changes: 212 additions & 11 deletions services/control-plane/internal/applier/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log/slog"
"time"

"github.com/google/uuid"
controlplanev1 "github.com/meridianhub/meridian/api/proto/meridian/control_plane/v1"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/meridianhub/meridian/shared/platform/tenant"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// Sentinel errors for handler configuration validation.
Expand Down Expand Up @@ -178,10 +180,14 @@ func (h *ApplyManifestHandler) ApplyManifest(

if execResult.err != nil {
logger.Error("execution failed", "error", execResult.err)
response.Status = controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_FAILED

// Record failed apply in history (no graph for failed applies, no optimistic lock)
_, _ = h.recordHistory(ctx, req.GetManifest(), req.GetAppliedBy(), execResult.jobID, manifest.ApplyStatusFailed, nil, 0)
// Determine if this is a partial failure (some phases succeeded)
applyStatus, responseStatus := classifyFailure(execResult.phaseStatus)
response.Status = responseStatus
response.PhaseStatus = phaseStatusMapToResponseProto(execResult.phaseStatus)

// Record history with phase status
_, _ = h.recordHistoryWithPhaseStatus(ctx, req.GetManifest(), req.GetAppliedBy(), execResult.jobID, applyStatus, nil, 0, execResult.phaseStatus)
Comment on lines +184 to +190
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't bypass optimistic locking on failed/partial history writes.

Line 190 hardcodes expectedSeq=0 and drops the returned error, so a stale request can still append a FAILED/PARTIAL history row after another manifest version has already advanced the sequence. This should use the same sequence check and ErrSequenceConflict handling as the success path.

Suggested fix
-		_, _ = h.recordHistoryWithPhaseStatus(ctx, req.GetManifest(), req.GetAppliedBy(), execResult.jobID, applyStatus, nil, 0, execResult.phaseStatus)
+		if _, err := h.recordHistoryWithPhaseStatus(
+			ctx,
+			req.GetManifest(),
+			req.GetAppliedBy(),
+			execResult.jobID,
+			applyStatus,
+			nil,
+			req.GetExpectedSequenceNumber(),
+			execResult.phaseStatus,
+		); err != nil && errors.Is(err, manifest.ErrSequenceConflict) {
+			return nil, status.Errorf(codes.Aborted, "%v", err)
+		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Determine if this is a partial failure (some phases succeeded)
applyStatus, responseStatus := classifyFailure(execResult.phaseStatus)
response.Status = responseStatus
response.PhaseStatus = phaseStatusMapToResponseProto(execResult.phaseStatus)
// Record history with phase status
_, _ = h.recordHistoryWithPhaseStatus(ctx, req.GetManifest(), req.GetAppliedBy(), execResult.jobID, applyStatus, nil, 0, execResult.phaseStatus)
// Determine if this is a partial failure (some phases succeeded)
applyStatus, responseStatus := classifyFailure(execResult.phaseStatus)
response.Status = responseStatus
response.PhaseStatus = phaseStatusMapToResponseProto(execResult.phaseStatus)
// Record history with phase status
if _, err := h.recordHistoryWithPhaseStatus(
ctx,
req.GetManifest(),
req.GetAppliedBy(),
execResult.jobID,
applyStatus,
nil,
req.GetExpectedSequenceNumber(),
execResult.phaseStatus,
); err != nil && errors.Is(err, manifest.ErrSequenceConflict) {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/control-plane/internal/applier/grpc_handler.go` around lines 184 -
190, The code currently calls recordHistoryWithPhaseStatus(..., expectedSeq=0)
and discards the error, allowing stale requests to append FAILED/PARTIAL rows;
change this to use the same optimistic-sequence logic as the success path: pass
the correct expected sequence (the same sequence value used when writing
successful history) instead of 0, capture the returned error from
recordHistoryWithPhaseStatus, and handle ErrSequenceConflict the same way the
success path does (i.e., detect sequence conflict and return/translate it to the
same conflict response or retry logic). Locate recordHistoryWithPhaseStatus,
classifyFailure, and the execResult.jobID/phaseStatus usage to implement the
change so failed/partial history writes respect optimistic locking and surface
the error.

return response, nil //nolint:nilerr // error conveyed via response status, not gRPC error
}

Expand All @@ -207,6 +213,7 @@ func (h *ApplyManifestHandler) ApplyManifest(
}

response.Status = controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_APPLIED
response.PhaseStatus = phaseStatusMapToResponseProto(execResult.phaseStatus)
logger.Info("manifest applied successfully", "job_id", execResult.jobID)

// Invoke post-apply hooks (e.g., cache invalidation)
Expand Down Expand Up @@ -412,9 +419,10 @@ func (h *ApplyManifestHandler) plan(

// executeOutput holds the results of manifest execution.
type executeOutput struct {
jobID string
stepResult *controlplanev1.StepResult
err error
jobID string
stepResult *controlplanev1.StepResult
err error
phaseStatus manifest.PhaseStatusMap
}

// execute runs the manifest apply via the executor.
Expand All @@ -424,8 +432,6 @@ func (h *ApplyManifestHandler) execute(
execPlan *planner.ExecutionPlan,
) executeOutput {
if h.executor == nil {
// No executor configured - reject non-dry-run applies to prevent
// acknowledging applies without actually executing them.
return executeOutput{
err: ErrExecutorNotConfigured,
stepResult: &controlplanev1.StepResult{
Expand All @@ -440,15 +446,23 @@ func (h *ApplyManifestHandler) execute(
input := buildExecutorInput(req.GetManifest())
input.TenantID = execPlan.TenantID

// Derive phase status from execution plan phases
phaseStatus := buildInitialPhaseStatus(execPlan)

result, err := h.executor.Apply(ctx, input)

// Update phase status based on execution outcome
updatePhaseStatus(phaseStatus, execPlan, result, err)

if err != nil {
jobID := ""
if result != nil {
jobID = result.JobID.String()
}
return executeOutput{
jobID: jobID,
err: err,
jobID: jobID,
err: err,
phaseStatus: phaseStatus,
stepResult: &controlplanev1.StepResult{
StepName: "execute",
Status: controlplanev1.StepResultStatus_STEP_RESULT_STATUS_FAILED,
Expand All @@ -458,7 +472,8 @@ func (h *ApplyManifestHandler) execute(
}

return executeOutput{
jobID: result.JobID.String(),
jobID: result.JobID.String(),
phaseStatus: phaseStatus,
stepResult: &controlplanev1.StepResult{
StepName: "execute",
Status: controlplanev1.StepResultStatus_STEP_RESULT_STATUS_SUCCESS,
Expand All @@ -470,6 +485,107 @@ func (h *ApplyManifestHandler) execute(
}
}

// buildInitialPhaseStatus creates a PhaseStatusMap with PENDING entries for each
// phase present in the execution plan.
func buildInitialPhaseStatus(plan *planner.ExecutionPlan) manifest.PhaseStatusMap {
phases := plan.Phases()
m := make(manifest.PhaseStatusMap, len(phases))
for _, p := range phases {
key := fmt.Sprintf("phase_%d", p)
m[key] = manifest.PhaseStatusEntry{
Status: manifest.PhaseStatusPending,
}
}
return m
}

// updatePhaseStatus updates phase entries based on the execution result.
// On success, all phases are marked COMPLETED.
// On failure, phases are marked based on step results: completed phases get COMPLETED,
// the phase containing the failing step gets FAILED, and remaining phases get SKIPPED.
func updatePhaseStatus(
phaseStatus manifest.PhaseStatusMap,
plan *planner.ExecutionPlan,
result *ApplyManifestResult,
execErr error,
) {
now := time.Now().UTC()
phases := plan.Phases()

if execErr == nil && result != nil {
// All phases completed successfully
for _, p := range phases {
key := fmt.Sprintf("phase_%d", p)
entry := phaseStatus[key]
entry.Status = manifest.PhaseStatusCompleted
entry.StartedAt = &now
entry.CompletedAt = &now
phaseStatus[key] = entry
}
return
}

// On failure: mark phases based on step results.
// Steps are executed in phase order. We mark phases as COMPLETED until we
// find a failed step, then the containing phase is FAILED and the rest are SKIPPED.
failedPhase := findFailedPhase(plan, result)

for _, p := range phases {
key := fmt.Sprintf("phase_%d", p)
entry := phaseStatus[key]
entry.StartedAt = &now

if failedPhase > 0 && p < failedPhase {
entry.Status = manifest.PhaseStatusCompleted
entry.CompletedAt = &now
} else if failedPhase > 0 && p == failedPhase {
entry.Status = manifest.PhaseStatusFailed
entry.CompletedAt = &now
if result != nil {
entry.Error = result.Error
} else if execErr != nil {
entry.Error = execErr.Error()
}
} else if failedPhase > 0 {
entry.Status = manifest.PhaseStatusSkipped
entry.StartedAt = nil
} else {
// No phase-level info available; mark all as failed
entry.Status = manifest.PhaseStatusFailed
entry.CompletedAt = &now
if execErr != nil {
entry.Error = execErr.Error()
}
}
phaseStatus[key] = entry
Comment on lines +512 to +560
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t backfill phase timings with one post-run timestamp.

updatePhaseStatus runs only after executor.Apply returns, so every executed phase gets the same synthetic now. That does not match the started_at / completed_at contract added in api/proto/meridian/control_plane/v1/manifest_history_service.proto and will make the stored phase timing misleading. Either thread real per-phase timing out of the executor or leave these fields unset until that data exists.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/control-plane/internal/applier/grpc_handler.go` around lines 512 -
560, The code in updatePhaseStatus (after executor.Apply) currently backfills
StartedAt/CompletedAt with a single synthetic now for every phase (variables:
phases, phaseStatus, findFailedPhase, result, execErr), which violates the
manifest timing contract; instead, stop setting entry.StartedAt and
entry.CompletedAt to &now unless you can source real per-phase timings from the
executor result (e.g., per-step/phase timestamps on result or StepResults) and
copy those into entry.StartedAt/CompletedAt; otherwise, only set entry.Status
and entry.Error as appropriate and leave StartedAt/CompletedAt nil so inaccurate
timestamps are not recorded.

}
Comment on lines +528 to +561
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Partial work inside the first failed phase is still reported as FAILED.

A phase can contain multiple calls. If some calls in phase 1 succeed and a later call in that same phase fails, updatePhaseStatus records only FAILED for phase_1, so classifyFailure returns FAILED because no phase is COMPLETED. That loses the "side effects already happened" signal this PR is adding.

Also applies to: 630-649

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/control-plane/internal/applier/grpc_handler.go` around lines 526 -
559, The current logic marks the entire failed phase as FAILED even if earlier
calls in that same phase succeeded; modify the update that runs when failedPhase
== p (in the loop that updates phaseStatus using findFailedPhase, and the
analogous block around classifyFailure/updatePhaseStatus later) to inspect the
step-level results (e.g., result.StepResults or whatever structure holds
per-step statuses) for the failedPhase: if any step in that phase succeeded,
treat the phase as having partial work — set the phase status to
manifest.PhaseStatusCompleted (or to an existing "partial" status if your
manifest has one), set CompletedAt = &now and populate Error from result.Error
or execErr as currently done; only mark it strictly FAILED when no step in that
phase completed successfully. Ensure you update both occurrences (the loop using
phases/phaseStatus and the similar block at lines ~630-649) and reference
findFailedPhase, phaseStatus, phases, result, execErr and classifyFailure when
making the change.

}
Comment on lines +551 to +562
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MUST FIX: findFailedPhase builds a map keyed by string(call.GRPCMethod) (fully-qualified gRPC paths like meridian.reference_data.v1.ReferenceDataService/RegisterInstrument) but matches against saga.StepResult.StepName which uses handler registry names (like reference_data.register_instrument).

These will never match in production, so findFailedPhase always returns 0, updatePhaseStatus marks all phases as FAILED (the failedPhase == 0 else-branch), and classifyFailure never sees both COMPLETED and FAILED entries. The PARTIAL status is unreachable.

The unit tests mask this because they use matching short names for both GRPCMethod and StepName.

Suggested fix: Add a HandlerName field to PlannedCall that stores the registry name (e.g., reference_data.register_instrument), then use that field as the map key here instead of GRPCMethod. Also update tests to use realistic production names to prevent regression.


// findFailedPhase returns the phase number of the first failed step, or 0 if
// it cannot be determined from step results.
//
// Step results are ordered by execution sequence, matching the call order in
// the execution plan. We use positional correlation: step result at index i
// corresponds to planned call at index i. This avoids the naming mismatch
// between saga handler names (e.g. "reference_data.register_instrument") and
// gRPC method paths in the execution plan.
func findFailedPhase(plan *planner.ExecutionPlan, result *ApplyManifestResult) planner.Phase {
if result == nil || len(result.StepResults) == 0 {
return 0
}

for i, step := range result.StepResults {
if !step.Success {
if i < len(plan.Calls) {
return plan.Calls[i].Phase
Comment on lines +572 to +580
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The positional correlation between result.StepResults and plan.Calls is correct today (the executor runs calls sequentially in plan order), but this invariant is implicit. Consider adding a brief comment explaining why positional correlation holds so future editors know this is load-bearing.

}
return 0
}
}

return 0
}

// recordHistory stores the manifest version in the history service.
// expectedSeq is passed through for atomic optimistic locking; 0 skips the check.
// Returns (nil, nil) if historyService is not configured.
Expand Down Expand Up @@ -513,6 +629,91 @@ func (h *ApplyManifestHandler) recordHistory(
return proto, nil
}

// classifyFailure examines phase statuses to determine if this is a partial
// or complete failure. Returns both the internal ApplyStatus and proto status.
func classifyFailure(ps manifest.PhaseStatusMap) (manifest.ApplyStatus, controlplanev1.ApplyManifestStatus) {
if ps == nil {
return manifest.ApplyStatusFailed, controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_FAILED
}
hasCompleted := false
hasFailed := false
for _, entry := range ps {
switch entry.Status { //nolint:exhaustive // only COMPLETED/FAILED affect classification
case manifest.PhaseStatusCompleted:
hasCompleted = true
case manifest.PhaseStatusFailed:
hasFailed = true
}
}
if hasCompleted && hasFailed {
return manifest.ApplyStatusPartial, controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_PARTIAL
}
return manifest.ApplyStatusFailed, controlplanev1.ApplyManifestStatus_APPLY_MANIFEST_STATUS_FAILED
}

// phaseStatusMapToResponseProto converts a PhaseStatusMap to proto map for the response.
func phaseStatusMapToResponseProto(ps manifest.PhaseStatusMap) map[string]*controlplanev1.PhaseStatusDetail {
if ps == nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: phaseStatusMapToResponseProto here is identical to phaseStatusMapToProto in history.go. Two copies of the same conversion logic will drift. Export PhaseStatusMapToProto from the manifest package and call it from both places.

return nil
}
result := make(map[string]*controlplanev1.PhaseStatusDetail, len(ps))
for key, entry := range ps {
detail := &controlplanev1.PhaseStatusDetail{
Status: string(entry.Status),
Error: entry.Error,
}
if entry.StartedAt != nil {
detail.StartedAt = timestamppb.New(*entry.StartedAt)
}
if entry.CompletedAt != nil {
detail.CompletedAt = timestamppb.New(*entry.CompletedAt)
}
result[key] = detail
}
return result
}

// recordHistoryWithPhaseStatus stores the manifest version in history with phase status.
func (h *ApplyManifestHandler) recordHistoryWithPhaseStatus(
ctx context.Context,
mf *controlplanev1.Manifest,
appliedBy string,
jobID string,
applyStatus manifest.ApplyStatus,
graph *validator.RelationshipGraph,
expectedSeq int64,
phaseStatus manifest.PhaseStatusMap,
) (*controlplanev1.ManifestVersion, error) {
if h.historyService == nil {
return nil, nil //nolint:nilnil // history recording is optional
}

var jobUUID *uuid.UUID
if jobID != "" {
parsed, err := uuid.Parse(jobID)
if err == nil {
jobUUID = &parsed
}
}

entity, err := h.historyService.StoreManifestVersionWithPhaseStatus(ctx, mf, appliedBy, jobUUID, applyStatus, graph, expectedSeq, phaseStatus)
if err != nil {
if errors.Is(err, manifest.ErrSequenceConflict) {
return nil, err
}
h.logger.Error("failed to record manifest history with phase status", "error", err)
return nil, nil //nolint:nilnil // non-conflict errors are logged but non-fatal
}

proto, err := manifest.EntityToProto(entity)
if err != nil {
h.logger.Error("failed to convert history entity to proto", "error", err)
return nil, nil //nolint:nilnil // conversion errors are logged but non-fatal
}

return proto, nil
}

// buildExecutorInput converts a Manifest proto into the ApplyManifestInput
// consumed by the saga-based ManifestExecutor.
func buildExecutorInput(mf *controlplanev1.Manifest) *ApplyManifestInput {
Expand Down
Loading
Loading