Skip to content

Commit 5efa32a

Browse files
author
Julian Koberg
committed
fix: stale node command
Signed-off-by: Julian Koberg <[email protected]>
1 parent caf84ec commit 5efa32a

File tree

2 files changed

+49
-6
lines changed

2 files changed

+49
-6
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Bugfix: Fix stale nodes command
2+
3+
Fix delete-stale-nodes command to properly remove or roll-back nodes
4+
5+
https://github.com/owncloud/ocis/pull/11704

services/storage-users/pkg/command/uploads.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"io/fs"
8+
"log"
89
"os"
910
"path/filepath"
1011
"strconv"
@@ -16,6 +17,7 @@ import (
1617
"github.com/urfave/cli/v2"
1718

1819
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
20+
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
1921
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
2022
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config"
2123
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser"
@@ -376,9 +378,18 @@ func DeleteStaleProcessingNodes(cfg *config.Config) *cli.Command {
376378
}
377379
}
378380

381+
var stream events.Stream
382+
if !dryRun {
383+
s, err := event.NewStream(cfg)
384+
if err != nil {
385+
log.Fatalf("Failed to create event stream: %v", err)
386+
}
387+
stream = s
388+
}
389+
379390
staleCount := 0
380391
for _, spaceID := range spaceIDs {
381-
staleCount += deleteStaleUploads(cfg, spaceID, dryRun, verbose)
392+
staleCount += deleteStaleUploads(cfg, spaceID, dryRun, verbose, stream)
382393
}
383394

384395
if verbose {
@@ -410,7 +421,7 @@ func globSpaceIDs(cfg *config.Config) []string {
410421
}
411422

412423
// delete stale processing nodes for a given spaceID
413-
func deleteStaleUploads(cfg *config.Config, spaceID string, dryRun bool, verbose bool) int {
424+
func deleteStaleUploads(cfg *config.Config, spaceID string, dryRun bool, verbose bool, stream events.Stream) int {
414425
if verbose {
415426
fmt.Printf("\nDeleting stale processing nodes for space: %s\n", spaceID)
416427
}
@@ -440,7 +451,7 @@ func deleteStaleUploads(cfg *config.Config, spaceID string, dryRun bool, verbose
440451

441452
staleCount := 0
442453
for _, path := range mpkFiles {
443-
staleCount += deleteStaleNode(cfg, path, dryRun, verbose)
454+
staleCount += deleteStaleNode(cfg, path, dryRun, verbose, stream)
444455
}
445456

446457
if verbose {
@@ -452,7 +463,7 @@ func deleteStaleUploads(cfg *config.Config, spaceID string, dryRun bool, verbose
452463

453464
// deleteStaleNode deletes a stale node: if it is not referenced by any upload session
454465
// returns 1 if the node stale node was detected for deletion, 0 otherwise, for counting purposes
455-
func deleteStaleNode(cfg *config.Config, path string, dryRun bool, verbose bool) int {
466+
func deleteStaleNode(cfg *config.Config, path string, dryRun bool, verbose bool, stream events.Stream) int {
456467
nodeDir := filepath.Dir(path)
457468

458469
// Read .mpk file to get processing info
@@ -495,11 +506,20 @@ func deleteStaleNode(cfg *config.Config, path string, dryRun bool, verbose bool)
495506
return 1
496507
}
497508

498-
if err := os.RemoveAll(nodeDir); err != nil {
499-
fmt.Fprintf(os.Stderr, "%sError deleting stale node %s: %v\n", LOG_INDENT_L2, nodeDir, err)
509+
rid := extractResourceID(strings.TrimSuffix(path, ".mpk"))
510+
if rid == nil {
511+
fmt.Fprintf(os.Stderr, "Failed to extract resource ID from path %s\n", path)
500512
return 0
501513
}
502514

515+
if err := events.Publish(context.Background(), stream, events.RevertRevision{
516+
ResourceID: rid,
517+
Timestamp: utils.TSNow(),
518+
}); err != nil {
519+
// if publishing fails there is no need to try publishing other events - they will fail too.
520+
log.Fatalf("Failed to send revert revision event for node '%s'\n", path)
521+
}
522+
503523
if verbose {
504524
fmt.Printf("%sDeleted stale node: %s\n", LOG_INDENT_L2, nodeDir)
505525
}
@@ -518,3 +538,21 @@ func extractProcessingID(mpkData map[string]interface{}) string {
518538
}
519539
return processingID
520540
}
541+
542+
func extractResourceID(path string) *provider.ResourceId {
543+
// path looks like /.../storage/users/spaces/f2/06bccf-0f10-4070-9e63-40943f060667/nodes/5b/ba/1e/a7/-f185-4f31-8342-ed4b5743f096
544+
parts := strings.Split(path, "spaces")
545+
if len(parts) < 2 {
546+
return nil
547+
}
548+
549+
spaceParts := strings.Split(parts[1], "nodes")
550+
if len(spaceParts) < 2 {
551+
return nil
552+
}
553+
554+
return &provider.ResourceId{
555+
SpaceId: strings.ReplaceAll(spaceParts[0], "/", ""),
556+
OpaqueId: strings.ReplaceAll(spaceParts[1], "/", ""),
557+
}
558+
}

0 commit comments

Comments
 (0)