Skip to content

Commit 3bf543a

Browse files
authored
Merge pull request #307 from kubescape/fix-migration
fix buggy locking for migration path
2 parents 8bc5305 + 098d253 commit 3bf543a

2 files changed

Lines changed: 139 additions & 72 deletions

File tree

pkg/registry/file/containerprofile_storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (c *ContainerProfileStorageImpl) GetStorageImpl() *StorageImpl {
9090
func (c *ContainerProfileStorageImpl) GetTsContainerProfile(ctx context.Context, key string) (softwarecomposition.ContainerProfile, error) {
9191
conn := ctx.Value(connKey).(*sqlite.Conn)
9292
tsProfile := softwarecomposition.ContainerProfile{}
93-
err := c.storageImpl.get(ctx, conn, key, storage.GetOptions{}, &tsProfile) // get instead of GetWithConn to bypass locking
93+
err := c.storageImpl.get(ctx, conn, key, storage.GetOptions{}, &tsProfile, noLock) // get instead of GetWithConn to bypass locking
9494
return tsProfile, err
9595
}
9696

pkg/registry/file/storage.go

Lines changed: 138 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -433,11 +433,21 @@ func (s *StorageImpl) GetWithConn(ctx context.Context, conn *sqlite.Conn, key st
433433
if lockDuration > time.Second {
434434
logger.L().Debug("Get", helpers.String("key", key), helpers.String("lockDuration", lockDuration.String()))
435435
}
436-
return s.get(ctx, conn, key, opts, objPtr)
436+
return s.get(ctx, conn, key, opts, objPtr, hasReadLock)
437437
}
438438

439+
// getLockState describes what lock the caller holds when invoking get(), so the
440+
// migration path can manage locks correctly without crashing or deadlocking.
441+
type getLockState int
442+
443+
const (
444+
noLock getLockState = iota // caller holds no lock; migration acquires a temporary write lock
445+
hasReadLock // caller holds a read lock; migration upgrades to write lock then restores read lock
446+
hasWriteLock // caller already holds the write lock; migration runs without lock changes
447+
)
448+
439449
// get is a helper function for Get to allow calls without locks from other methods that already have them
440-
func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, opts storage.GetOptions, objPtr runtime.Object) error {
450+
func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, opts storage.GetOptions, objPtr runtime.Object, lockState getLockState) error {
441451
p := filepath.Join(s.root, key)
442452
if opts.ResourceVersion == softwarecomposition.ResourceVersionMetadata {
443453
// get metadata from SQLite
@@ -455,6 +465,25 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op
455465
}
456466
return json.Unmarshal(metadata, objPtr)
457467
}
468+
469+
// noLock callers perform unsynchronized file I/O by default. Acquire a
470+
// temporary read lock so that a concurrent saveObject cannot truncate or
471+
// overwrite the file while we are decoding it. The lock is released via
472+
// the deferred cleanup below; the migration path clears ownedRLock before
473+
// calling RUnlock itself so the defer becomes a no-op.
474+
ownedRLock := false
475+
if lockState == noLock {
476+
if err := s.locks.RLock(ctx, key); err != nil {
477+
return apierrors.NewTimeoutError(fmt.Sprintf("rlock: %v", err), 0)
478+
}
479+
ownedRLock = true
480+
defer func() {
481+
if ownedRLock {
482+
s.locks.RUnlock(key)
483+
}
484+
}()
485+
}
486+
458487
payloadFile, err := s.openPayloadFileWithFallback(makePayloadPath(p), os.O_RDONLY, 0)
459488
if err != nil {
460489
if errors.Is(err, afero.ErrFileNotFound) {
@@ -484,78 +513,51 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op
484513
if strings.Contains(err.Error(), "gob: wrong type") || strings.Contains(err.Error(), "extra fields") {
485514
logger.L().Ctx(ctx).Info("Get - detected gob type mismatch, attempting external migration", helpers.Error(err), helpers.String("key", key))
486515

487-
// Rewrite the object in the modern format to complete the migration
488-
// We upgrade to a write lock BEFORE running the migration tool to prevent concurrent migration attempts
489-
s.locks.RUnlock(key)
490-
// re-acquire read lock if anything fails or when we are done
491-
defer s.locks.RLock(ctx, key)
492-
493-
if lockErr := s.locks.Lock(ctx, key); lockErr != nil {
494-
logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key))
495-
return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr)
496-
}
497-
defer s.locks.Unlock(key)
498-
499-
// Re-check if the file still needs migration now that we have the write lock
500-
// Another thread might have finished the migration while we were waiting for the lock
501-
payloadFileRetry, err := s.openPayloadFileWithFallback(makePayloadPath(p), os.O_RDONLY, 0)
502-
if err == nil {
503-
decoderRetry := gob.NewDecoder(NewDirectIOReader(payloadFileRetry))
504-
errRetry := decoderRetry.Decode(objPtr)
505-
_ = payloadFileRetry.Close()
506-
if errRetry == nil {
507-
logger.L().Ctx(ctx).Info("Get - migration already completed by another thread", helpers.String("key", key))
508-
return nil
509-
}
510-
}
511-
512-
typeName := "ApplicationProfile"
513-
if _, ok := objPtr.(*softwarecomposition.ContainerProfile); ok {
514-
typeName = "ContainerProfile"
515-
} else if _, ok := objPtr.(*softwarecomposition.SeccompProfile); ok {
516-
typeName = "SeccompProfile"
517-
}
518-
519-
// Run migration tool: /usr/bin/migration -file <path> -type <typeName>
520-
migrationCtx, migrationCancel := context.WithTimeout(ctx, 30*time.Second)
521-
defer migrationCancel()
522-
523-
cmd := exec.CommandContext(migrationCtx, "/usr/bin/migration", "-file", makePayloadPath(p), "-type", typeName)
524-
var out bytes.Buffer
525-
var stderr bytes.Buffer
526-
cmd.Stdout = &out
527-
cmd.Stderr = &stderr
528-
if runErr := cmd.Run(); runErr != nil {
529-
if errors.Is(migrationCtx.Err(), context.DeadlineExceeded) {
530-
logger.L().Ctx(ctx).Error("Get - migration tool timed out", helpers.String("key", key))
531-
return fmt.Errorf("migration tool timed out: %w", migrationCtx.Err())
516+
// Acquire a write lock before running the migration tool to prevent
517+
// concurrent migration attempts. All lock transitions are explicit
518+
// (no deferred lock calls) so that errors can be checked and the lock
519+
// state is always well-defined on every return path.
520+
var migErr error
521+
switch lockState {
522+
case hasReadLock:
523+
// Drop the caller's read lock and upgrade to write lock.
524+
// After migration, restore the read lock so the caller's deferred
525+
// RUnlock (in GetWithConn) has a matching acquire.
526+
s.locks.RUnlock(key)
527+
if lockErr := s.locks.Lock(ctx, key); lockErr != nil {
528+
logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key))
529+
// Best-effort: restore the read lock so the caller's deferred
530+
// RUnlock is not left unmatched.
531+
if rlockErr := s.locks.RLock(ctx, key); rlockErr != nil {
532+
logger.L().Ctx(ctx).Error("Get - failed to restore read lock after write lock failure", helpers.Error(rlockErr), helpers.String("key", key))
533+
}
534+
return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr)
532535
}
533-
logger.L().Ctx(ctx).Error("Get - migration tool failed", helpers.Error(runErr), helpers.String("stderr", stderr.String()), helpers.String("key", key))
534-
// If migration tool fails, treat as corrupted and delete
535-
_ = DeleteMetadata(conn, key, nil)
536-
_ = s.appFs.Remove(makePayloadPath(p))
537-
if opts.IgnoreNotFound {
538-
return runtime.SetZeroValue(objPtr)
539-
} else {
540-
return storage.NewKeyNotFoundError(key, 0)
536+
migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr)
537+
s.locks.Unlock(key)
538+
if rlockErr := s.locks.RLock(ctx, key); rlockErr != nil {
539+
logger.L().Ctx(ctx).Error("Get - failed to restore read lock after migration", helpers.Error(rlockErr), helpers.String("key", key))
540+
return fmt.Errorf("failed to restore read lock after migration: %w", rlockErr)
541541
}
542-
}
543542

544-
// Migration tool outputted JSON, unmarshal it into objPtr
545-
if unmarshalErr := json.Unmarshal(out.Bytes(), objPtr); unmarshalErr != nil {
546-
logger.L().Ctx(ctx).Error("Get - unmarshal migrated JSON failed", helpers.Error(unmarshalErr), helpers.String("key", key))
547-
return unmarshalErr
548-
}
549-
550-
logger.L().Ctx(ctx).Info("Get - external migration successful", helpers.String("key", key))
543+
case noLock:
544+
// We hold a temporary read lock (ownedRLock=true); release it and
545+
// upgrade to write lock. Do not re-acquire afterwards — there is
546+
// no outer caller expecting a read lock to be held.
547+
ownedRLock = false
548+
s.locks.RUnlock(key)
549+
if lockErr := s.locks.Lock(ctx, key); lockErr != nil {
550+
logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key))
551+
return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr)
552+
}
553+
migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr)
554+
s.locks.Unlock(key)
551555

552-
if saveErr := s.saveObject(conn, key, objPtr, nil, ""); saveErr != nil {
553-
logger.L().Ctx(ctx).Error("Get - failed to rewrite migrated object", helpers.Error(saveErr), helpers.String("key", key))
554-
} else {
555-
logger.L().Ctx(ctx).Info("Get - successfully migrated object to modern format", helpers.String("key", key))
556+
case hasWriteLock:
557+
// Already holding the write lock — just migrate.
558+
migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr)
556559
}
557-
558-
return nil
560+
return migErr
559561
}
560562

561563
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
@@ -573,6 +575,71 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op
573575
return err
574576
}
575577

578+
// migrateObject runs the external migration tool and unmarshals the output into objPtr.
579+
// It is used by get() to migrate objects that need external migration.
580+
// The caller must hold the write lock for key before calling this.
581+
func (s *StorageImpl) migrateObject(ctx context.Context, conn *sqlite.Conn, path, key string, opts storage.GetOptions, objPtr runtime.Object) error {
582+
// Re-check if the file still needs migration — another goroutine may have
583+
// already migrated it while we were waiting for the write lock.
584+
payloadFileRetry, err := s.openPayloadFileWithFallback(makePayloadPath(path), os.O_RDONLY, 0)
585+
if err == nil {
586+
decoderRetry := gob.NewDecoder(NewDirectIOReader(payloadFileRetry))
587+
errRetry := decoderRetry.Decode(objPtr)
588+
_ = payloadFileRetry.Close()
589+
if errRetry == nil {
590+
logger.L().Ctx(ctx).Info("Get - migration already completed by another thread", helpers.String("key", key))
591+
return nil
592+
}
593+
}
594+
595+
typeName := "ApplicationProfile"
596+
if _, ok := objPtr.(*softwarecomposition.ContainerProfile); ok {
597+
typeName = "ContainerProfile"
598+
} else if _, ok := objPtr.(*softwarecomposition.SeccompProfile); ok {
599+
typeName = "SeccompProfile"
600+
}
601+
602+
migrationCtx, migrationCancel := context.WithTimeout(ctx, 30*time.Second)
603+
defer migrationCancel()
604+
605+
cmd := exec.CommandContext(migrationCtx, "/usr/bin/migration", "-file", makePayloadPath(path), "-type", typeName)
606+
var out bytes.Buffer
607+
var stderr bytes.Buffer
608+
cmd.Stdout = &out
609+
cmd.Stderr = &stderr
610+
if runErr := cmd.Run(); runErr != nil {
611+
if errors.Is(migrationCtx.Err(), context.DeadlineExceeded) {
612+
logger.L().Ctx(ctx).Error("Get - migration tool timed out", helpers.String("key", key))
613+
return fmt.Errorf("migration tool timed out: %w", migrationCtx.Err())
614+
}
615+
logger.L().Ctx(ctx).Error("Get - migration tool failed", helpers.Error(runErr), helpers.String("stderr", stderr.String()), helpers.String("key", key))
616+
// If migration tool fails, treat as corrupted and delete
617+
_ = DeleteMetadata(conn, key, nil)
618+
_ = s.appFs.Remove(makePayloadPath(path))
619+
if opts.IgnoreNotFound {
620+
return runtime.SetZeroValue(objPtr)
621+
} else {
622+
return storage.NewKeyNotFoundError(key, 0)
623+
}
624+
}
625+
626+
// Migration tool outputted JSON, unmarshal it into objPtr
627+
if unmarshalErr := json.Unmarshal(out.Bytes(), objPtr); unmarshalErr != nil {
628+
logger.L().Ctx(ctx).Error("Get - unmarshal migrated JSON failed", helpers.Error(unmarshalErr), helpers.String("key", key))
629+
return unmarshalErr
630+
}
631+
632+
logger.L().Ctx(ctx).Info("Get - external migration successful", helpers.String("key", key))
633+
634+
if saveErr := s.saveObject(conn, key, objPtr, nil, ""); saveErr != nil {
635+
logger.L().Ctx(ctx).Error("Get - failed to rewrite migrated object", helpers.Error(saveErr), helpers.String("key", key))
636+
} else {
637+
logger.L().Ctx(ctx).Info("Get - successfully migrated object to modern format", helpers.String("key", key))
638+
}
639+
640+
return nil
641+
}
642+
576643
// GetList unmarshalls objects found at key into a *List api object (an object
577644
// that satisfies runtime.IsList definition).
578645
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive`
@@ -623,7 +690,7 @@ func (s *StorageImpl) GetListWithConn(ctx context.Context, conn *sqlite.Conn, ke
623690
v.Set(reflect.MakeSlice(v.Type(), 0, len(list)))
624691
for _, k := range list {
625692
obj := reflect.New(elem).Interface().(runtime.Object)
626-
if err := s.get(ctx, conn, k, storage.GetOptions{}, obj); err != nil {
693+
if err := s.get(ctx, conn, k, storage.GetOptions{}, obj, noLock); err != nil {
627694
logger.L().Ctx(ctx).Error("GetList - get object failed", helpers.Error(err), helpers.String("key", k))
628695
}
629696
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
@@ -807,7 +874,7 @@ func (s *StorageImpl) GuaranteedUpdateWithConn(
807874

808875
getCurrentState := func() (*objState, error) {
809876
objPtr := reflect.New(v.Type()).Interface().(runtime.Object)
810-
err := s.get(ctx, conn, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, objPtr)
877+
err := s.get(ctx, conn, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, objPtr, hasWriteLock)
811878
if err != nil {
812879
logger.L().Ctx(ctx).Error("GuaranteedUpdate - get failed", helpers.Error(err), helpers.String("key", key))
813880
return nil, err

0 commit comments

Comments
 (0)