Skip to content

Commit 03f1991

Browse files
authored
Merge pull request #417 from fabriziosestito/fix/error-handling
fix(handlers): ack messages when an error occurs and fix status update conflict
2 parents 564eee5 + a2ea9ed commit 03f1991

3 files changed

Lines changed: 41 additions & 28 deletions

File tree

internal/handlers/create_catalog.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
apierrors "k8s.io/apimachinery/pkg/api/errors"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/types"
2526
"k8s.io/apimachinery/pkg/util/sets"
2627
"k8s.io/client-go/util/retry"
2728
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -215,18 +216,29 @@ func (h *CreateCatalogHandler) Handle(ctx context.Context, message []byte) error
215216
return fmt.Errorf("cannot delete obsolete images in registry %s: %w", registry.Name, err)
216217
}
217218

218-
if len(discoveredImages) == 0 {
219-
h.logger.DebugContext(ctx, "No images to process", "scanjob", scanJob.Name, "namespace", scanJob.Namespace)
219+
// It is possible that the controller is slow to set the status condition "Scheduled" to true,
220+
// so we might encounter conflicts when setting the status conditions.
221+
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
222+
if err = h.k8sClient.Get(ctx, types.NamespacedName{
223+
Name: scanJob.Name,
224+
Namespace: scanJob.Namespace,
225+
}, scanJob); err != nil {
226+
return fmt.Errorf("cannot get scan job %s/%s while updating status: %w", scanJob.Namespace, scanJob.Name, err)
227+
}
220228

221-
scanJob.MarkComplete(v1alpha1.ReasonNoImagesToScan, "No images to process")
222-
} else {
223-
h.logger.DebugContext(ctx, "Images to process", "count", len(discoveredImages))
224-
scanJob.MarkInProgress(v1alpha1.ReasonSBOMGenerationInProgress, "SBOM generation in progress")
225-
scanJob.Status.ImagesCount = len(discoveredImages)
226-
scanJob.Status.ScannedImagesCount = 0
227-
}
229+
if len(discoveredImages) == 0 {
230+
h.logger.DebugContext(ctx, "No images to process", "scanjob", scanJob.Name, "namespace", scanJob.Namespace)
231+
scanJob.MarkComplete(v1alpha1.ReasonNoImagesToScan, "No images to process")
232+
} else {
233+
h.logger.DebugContext(ctx, "Images to process", "count", len(discoveredImages))
234+
scanJob.MarkInProgress(v1alpha1.ReasonSBOMGenerationInProgress, "SBOM generation in progress")
235+
scanJob.Status.ImagesCount = len(discoveredImages)
236+
scanJob.Status.ScannedImagesCount = 0
237+
}
228238

229-
if err = h.k8sClient.Status().Update(ctx, scanJob); err != nil {
239+
return h.k8sClient.Status().Update(ctx, scanJob)
240+
})
241+
if err != nil {
230242
return fmt.Errorf("cannot update scan job status %s/%s: %w", createCatalogMessage.ScanJob.Namespace, createCatalogMessage.ScanJob.Name, err)
231243
}
232244

internal/handlers/scanjob_failure.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log/slog"
88

9+
"k8s.io/client-go/util/retry"
910
"sigs.k8s.io/controller-runtime/pkg/client"
1011

1112
sbombasticv1alpha1 "github.com/rancher/sbombastic/api/v1alpha1"
@@ -34,40 +35,40 @@ func (h *ScanJobFailureHandler) HandleFailure(ctx context.Context, message []byt
3435
if err := json.Unmarshal(message, baseMessage); err != nil {
3536
return fmt.Errorf("failed to unmarshal base message: %w", err)
3637
}
37-
3838
h.logger.DebugContext(ctx, "Handling ScanJob failure",
3939
"scanjob", baseMessage.ScanJob.Name,
4040
"namespace", baseMessage.ScanJob.Namespace,
4141
"error", errorMessage,
4242
)
4343

4444
scanJob := &sbombasticv1alpha1.ScanJob{}
45-
err := h.k8sClient.Get(ctx, client.ObjectKey{
46-
Name: baseMessage.ScanJob.Name,
47-
Namespace: baseMessage.ScanJob.Namespace,
48-
}, scanJob)
49-
if err != nil {
50-
return fmt.Errorf("failed to get ScanJob %s/%s: %w",
51-
baseMessage.ScanJob.Namespace, baseMessage.ScanJob.Name, err)
52-
}
5345

54-
original := scanJob.DeepCopy()
55-
scanJob.MarkFailed(sbombasticv1alpha1.ReasonInternalError, errorMessage)
46+
// It is possible that the controller is slow to set the status condition "Scheduled" to true,
47+
// so we might encounter conflicts when setting the status conditions.
48+
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
49+
if err := h.k8sClient.Get(ctx, client.ObjectKey{
50+
Name: baseMessage.ScanJob.Name,
51+
Namespace: baseMessage.ScanJob.Namespace,
52+
}, scanJob); err != nil {
53+
return fmt.Errorf("failed to get ScanJob %s/%s: %w", scanJob.Namespace, scanJob.Name, err)
54+
}
5655

57-
if err := h.k8sClient.Status().Patch(ctx, scanJob, client.MergeFrom(original)); err != nil {
56+
scanJob.MarkFailed(sbombasticv1alpha1.ReasonInternalError, errorMessage)
57+
return h.k8sClient.Status().Update(ctx, scanJob)
58+
})
59+
if err != nil {
5860
h.logger.ErrorContext(ctx, "Failed to update ScanJob status with failure",
5961
"scanjob", scanJob.Name,
6062
"namespace", scanJob.Namespace,
6163
"error", err,
6264
)
63-
return fmt.Errorf("failed to update ScanJob %s/%s status: %w", baseMessage.ScanJob.Namespace, baseMessage.ScanJob.Name, err)
65+
return fmt.Errorf("failed to update ScanJob %s/%s status: %w", scanJob.Namespace, scanJob.Name, err)
6466
}
6567

6668
h.logger.DebugContext(ctx, "ScanJob marked as failed",
6769
"scanjob", scanJob.Name,
6870
"namespace", scanJob.Namespace,
6971
"error_message", errorMessage,
7072
)
71-
7273
return nil
7374
}

internal/messaging/subscriber.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ func (s *NatsSubscriber) Run(ctx context.Context) error {
7878
"error", err,
7979
)
8080
}
81-
}
8281

83-
// Return early to avoid acking a message that failed processing.
84-
// This allows the message to be retried later even if the nak fails.
85-
return
82+
// Return early to avoid acking a message that failed processing.
83+
// This allows the message to be retried later even if the nak fails.
84+
return
85+
}
8686
}
8787

8888
if err := msg.Ack(); err != nil {

0 commit comments

Comments
 (0)