Skip to content

Commit 541c7c3

Browse files
committed
fix: server lifecycle and backup data integrity edge cases
1 parent 3982faf commit 541c7c3

File tree

5 files changed

+152
-11
lines changed

5 files changed

+152
-11
lines changed

src/jobs/backup_job.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ func (j *BackupCreateJob) Execute(ctx context.Context, reporter ProgressReporter
208208
return nil, errors.New("server not found: " + j.serverID)
209209
}
210210

211+
j.serverManager.AcquireBackupLock(j.serverID)
212+
defer j.serverManager.ReleaseBackupLock(j.serverID)
213+
214+
logger.Debug("acquired backup operation lock for server")
215+
211216
reporter.ReportProgress(20, "Initializing backup adapter...")
212217

213218
var adapter backup.BackupInterface
@@ -361,6 +366,10 @@ func (j *BackupDeleteJob) Execute(ctx context.Context, reporter ProgressReporter
361366
return nil, errors.New("server not found: " + j.serverID)
362367
}
363368

369+
j.serverManager.AcquireBackupLock(j.serverID)
370+
defer j.serverManager.ReleaseBackupLock(j.serverID)
371+
372+
logger.Debug("acquired backup operation lock for server")
364373
logger.Info("executing backup delete job")
365374

366375
switch backup.AdapterType(j.adapterType) {
@@ -698,6 +707,11 @@ func (j *BackupRestoreJob) Execute(ctx context.Context, reporter ProgressReporte
698707
return nil, errors.New("server not found: " + j.serverID)
699708
}
700709

710+
j.serverManager.AcquireBackupLock(j.serverID)
711+
defer j.serverManager.ReleaseBackupLock(j.serverID)
712+
713+
logger.Debug("acquired backup operation lock for server")
714+
701715
s.SetRestoring(true)
702716
defer s.SetRestoring(false)
703717

src/router/router_server.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@ import (
55
"net/http"
66
"os"
77
"strconv"
8+
"time"
89

910
"emperror.dev/errors"
1011
"github.com/apex/log"
1112
"github.com/gin-gonic/gin"
1213

14+
"github.com/pyrohost/elytra/src/remote"
1315
"github.com/pyrohost/elytra/src/router/downloader"
1416
"github.com/pyrohost/elytra/src/router/middleware"
1517
"github.com/pyrohost/elytra/src/router/tokens"
1618
"github.com/pyrohost/elytra/src/server"
19+
"github.com/pyrohost/elytra/src/server/backup"
1720
"github.com/pyrohost/elytra/src/server/transfer"
1821
)
1922

@@ -221,6 +224,10 @@ func deleteServer(c *gin.Context) {
221224
return
222225
}
223226

227+
// Destroy backup repositories to prevent orphaned data on S3 and local storage.
228+
// This runs in the background since repository destruction can take time for large repositories.
229+
go destroyServerBackupRepositories(c.Request.Context(), s, middleware.ExtractManager(c).Client())
230+
224231
// Once the environment is terminated, remove the server files from the system. This is
225232
// done in a separate process since failure is not the end of the world and can be
226233
// manually cleaned up after the fact.
@@ -243,6 +250,76 @@ func deleteServer(c *gin.Context) {
243250
c.Status(http.StatusNoContent)
244251
}
245252

253+
// destroyServerBackupRepositories destroys all backup repositories (local and S3) for a server.
254+
func destroyServerBackupRepositories(ctx context.Context, s *server.Server, client remote.Client) {
255+
logger := log.WithField("server_id", s.ID())
256+
257+
repositories := []struct {
258+
name string
259+
backupType string
260+
}{
261+
{"local", "local"},
262+
{"s3", "s3"},
263+
}
264+
265+
for _, repoInfo := range repositories {
266+
func() {
267+
rusticConfig, err := client.GetServerRusticConfig(ctx, s.ID(), repoInfo.backupType)
268+
if err != nil {
269+
logger.WithFields(log.Fields{
270+
"type": repoInfo.backupType,
271+
"error": err,
272+
}).Warn("failed to get rustic config for repository destruction, repository may not exist")
273+
return
274+
}
275+
276+
var repository backup.Repository
277+
var repoErr error
278+
279+
config := backup.Config{
280+
ServerUUID: s.ID(),
281+
Password: rusticConfig.RepositoryPassword,
282+
BackupType: repoInfo.backupType,
283+
}
284+
285+
if repoInfo.backupType == "local" {
286+
config.LocalPath = rusticConfig.RepositoryPath
287+
repository, repoErr = backup.NewLocalRepository(config)
288+
} else {
289+
config.S3Config = &backup.S3Config{
290+
Endpoint: rusticConfig.S3Credentials.Endpoint,
291+
Bucket: rusticConfig.S3Credentials.Bucket,
292+
AccessKeyID: rusticConfig.S3Credentials.AccessKeyID,
293+
SecretAccessKey: rusticConfig.S3Credentials.SecretAccessKey,
294+
Region: rusticConfig.S3Credentials.Region,
295+
}
296+
config.LocalPath = rusticConfig.RepositoryPath
297+
repository, repoErr = backup.NewS3Repository(config)
298+
}
299+
300+
if repoErr != nil {
301+
logger.WithFields(log.Fields{
302+
"type": repoInfo.backupType,
303+
"error": repoErr,
304+
}).Error("failed to create repository instance for destruction")
305+
return
306+
}
307+
308+
destroyCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
309+
defer cancel()
310+
311+
if err := repository.Destroy(destroyCtx); err != nil {
312+
logger.WithFields(log.Fields{
313+
"type": repoInfo.backupType,
314+
"error": err,
315+
}).Error("failed to destroy backup repository")
316+
} else {
317+
logger.WithField("type", repoInfo.backupType).Info("successfully destroyed backup repository")
318+
}
319+
}()
320+
}
321+
}
322+
246323
// Adds any of the JTIs passed through in the body to the deny list for the websocket
247324
// preventing any JWT generated before the current time from being used to connect to
248325
// the socket or send along commands.

src/server/backup/rustic_local.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,9 @@ func (r *LocalRepository) DeleteSnapshot(ctx context.Context, id string) error {
280280
deleteCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
281281
defer cancel()
282282

283-
cmd, err := r.buildCommand(deleteCtx, "forget", "--prune", id)
283+
// Use forget with --prune and --keep-delete to safely reclaim storage space
284+
// The 1-day delay ensures any parallel backups have time to complete and reference shared blobs
285+
cmd, err := r.buildCommand(deleteCtx, "forget", "--prune", "--keep-delete", "1d", id)
284286
if err != nil {
285287
return errors.Wrap(err, "failed to build delete command")
286288
}

src/server/backup/rustic_s3.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -526,9 +526,9 @@ func (r *S3Repository) deleteSnapshotWithTimeout(ctx context.Context, id string,
526526
deleteCtx, cancel := context.WithTimeout(ctx, timeout)
527527
defer cancel()
528528

529-
// Use forget with --prune and --instant-delete to immediately reclaim storage space
530-
// Note: --instant-delete is safe here because Elytra controls all backup operations
531-
cmd, err := r.buildCommand(deleteCtx, "forget", "--prune", "--instant-delete", id)
529+
// Use forget with --prune and --keep-delete to safely reclaim storage space
530+
// The 1-day delay ensures any parallel backups have time to complete and reference shared blobs
531+
cmd, err := r.buildCommand(deleteCtx, "forget", "--prune", "--keep-delete", "1d", id)
532532
if err != nil {
533533
return errors.Wrap(err, "failed to build delete command")
534534
}
@@ -707,8 +707,14 @@ func (r *S3Repository) RestoreSnapshot(ctx context.Context, snapshotID string, t
707707
return errors.Wrap(err, "failed to build restore command")
708708
}
709709

710-
if err := cmd.Run(); err != nil {
711-
return errors.Wrap(err, "S3 restore failed")
710+
output, err := cmd.CombinedOutput()
711+
if err != nil {
712+
outputStr := string(output)
713+
r.logger.WithFields(log.Fields{
714+
"snapshot_id": snapshotID,
715+
"output": outputStr,
716+
}).Error("rustic restore command failed")
717+
return errors.Wrapf(err, "S3 restore failed: %s", outputStr)
712718
}
713719

714720
return nil

src/server/manager.go

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323
)
2424

2525
type Manager struct {
26-
mu sync.RWMutex
27-
client remote.Client
28-
servers []*Server
26+
mu sync.RWMutex
27+
client remote.Client
28+
servers []*Server
29+
backupOperationsMu sync.Mutex
30+
backupOperations map[string]*sync.Mutex
2931
}
3032

3133
// NewManager returns a new server manager instance. This will boot up all the
@@ -43,7 +45,10 @@ func NewManager(ctx context.Context, client remote.Client) (*Manager, error) {
4345
// loading any of the servers from the disk. This allows the caller to set their
4446
// own servers into the collection as needed.
4547
func NewEmptyManager(client remote.Client) *Manager {
46-
return &Manager{client: client}
48+
return &Manager{
49+
client: client,
50+
backupOperations: make(map[string]*sync.Mutex),
51+
}
4752
}
4853

4954
// Client returns the HTTP client interface that allows interaction with the
@@ -131,13 +136,27 @@ func (m *Manager) Find(filter func(match *Server) bool) *Server {
131136
func (m *Manager) Remove(filter func(match *Server) bool) {
132137
m.mu.Lock()
133138
defer m.mu.Unlock()
139+
140+
// Collect IDs of servers being removed for lock cleanup
141+
removedIDs := make([]string, 0)
134142
r := make([]*Server, 0)
135143
for _, v := range m.servers {
136-
if !filter(v) {
144+
if filter(v) {
145+
removedIDs = append(removedIDs, v.ID())
146+
} else {
137147
r = append(r, v)
138148
}
139149
}
140150
m.servers = r
151+
152+
// Clean up backup operation locks for removed servers
153+
if len(removedIDs) > 0 {
154+
m.backupOperationsMu.Lock()
155+
for _, id := range removedIDs {
156+
delete(m.backupOperations, id)
157+
}
158+
m.backupOperationsMu.Unlock()
159+
}
141160
}
142161

143162
// PersistStates writes the current environment states to the disk for each
@@ -280,3 +299,26 @@ func (m *Manager) init(ctx context.Context) error {
280299

281300
return nil
282301
}
302+
303+
// AcquireBackupLock acquires a per-server backup operation lock to prevent concurrent backup operations.
304+
func (m *Manager) AcquireBackupLock(serverID string) {
305+
m.backupOperationsMu.Lock()
306+
if _, exists := m.backupOperations[serverID]; !exists {
307+
m.backupOperations[serverID] = &sync.Mutex{}
308+
}
309+
mu := m.backupOperations[serverID]
310+
m.backupOperationsMu.Unlock()
311+
312+
mu.Lock()
313+
}
314+
315+
// ReleaseBackupLock releases a per-server backup operation lock.
316+
func (m *Manager) ReleaseBackupLock(serverID string) {
317+
m.backupOperationsMu.Lock()
318+
mu, exists := m.backupOperations[serverID]
319+
m.backupOperationsMu.Unlock()
320+
321+
if exists {
322+
mu.Unlock()
323+
}
324+
}

0 commit comments

Comments
 (0)