Skip to content

Commit a700939

Browse files
authored
feat(share/availability/full): Introduce Q4 trimming for archival nodes (#4028)
1 parent e11cd71 commit a700939

File tree

13 files changed

+467
-136
lines changed

13 files changed

+467
-136
lines changed

Diff for: nodebuilder/pruner/migration_utils.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package pruner
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/ipfs/go-datastore"
8+
"github.com/ipfs/go-datastore/namespace"
9+
10+
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
11+
)
12+
13+
// TODO @renaynay: remove this file after a few releases -- this utility serves as a temporary solution
14+
// to detect if the node has been run with pruning enabled before on previous version(s), and disallow
15+
// running as an archival node.
16+
17+
var (
18+
storePrefix = datastore.NewKey("full_avail")
19+
previousModeKey = datastore.NewKey("previous_mode")
20+
)
21+
22+
// detectFirstRun is a temporary function that serves to assist migration to the refactored pruner
23+
// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking
24+
// if the pruner service ran before, and disallows running as an archival node in the case it has.
25+
func detectFirstRun(ctx context.Context, cfg *Config, ds datastore.Datastore, lastPrunedHeight uint64) error {
26+
ds = namespace.Wrap(ds, storePrefix)
27+
28+
exists, err := ds.Has(ctx, previousModeKey)
29+
if err != nil {
30+
return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+
31+
"datastore: %w", err)
32+
}
33+
if exists {
34+
// node has already been run on current version, no migration is necessary
35+
return nil
36+
}
37+
38+
isArchival := !cfg.EnableService
39+
40+
// if the node has been pruned before on a previous version, it cannot revert
41+
// to archival mode
42+
if isArchival && lastPrunedHeight > 1 {
43+
return fullavail.ErrDisallowRevertToArchival
44+
}
45+
46+
return recordFirstRun(ctx, ds, isArchival)
47+
}
48+
49+
// recordFirstRun exists to assist migration to new pruner implementation (v0.21.0) by recording
50+
// the first run of the pruner service in the full availability's datastore. It assumes the datastore
51+
// is already namespace-wrapped.
52+
func recordFirstRun(ctx context.Context, ds datastore.Datastore, isArchival bool) error {
53+
mode := []byte("pruned")
54+
if isArchival {
55+
mode = []byte("archival")
56+
}
57+
58+
return ds.Put(ctx, previousModeKey, mode)
59+
}

Diff for: nodebuilder/pruner/migration_utils_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package pruner
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/ipfs/go-datastore"
8+
"github.com/ipfs/go-datastore/namespace"
9+
ds_sync "github.com/ipfs/go-datastore/sync"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
13+
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
14+
)
15+
16+
func TestDetectFirstRun(t *testing.T) {
17+
ctx, cancel := context.WithCancel(context.Background())
18+
defer cancel()
19+
20+
t.Run("FirstRunArchival", func(t *testing.T) {
21+
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
22+
23+
cfg := &Config{EnableService: false}
24+
25+
err := detectFirstRun(ctx, cfg, ds, 1)
26+
assert.NoError(t, err)
27+
28+
nsWrapped := namespace.Wrap(ds, storePrefix)
29+
prevMode, err := nsWrapped.Get(ctx, previousModeKey)
30+
require.NoError(t, err)
31+
assert.Equal(t, []byte("archival"), prevMode)
32+
})
33+
34+
t.Run("FirstRunPruned", func(t *testing.T) {
35+
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
36+
37+
cfg := &Config{EnableService: true}
38+
39+
err := detectFirstRun(ctx, cfg, ds, 1)
40+
assert.NoError(t, err)
41+
42+
nsWrapped := namespace.Wrap(ds, storePrefix)
43+
prevMode, err := nsWrapped.Get(ctx, previousModeKey)
44+
require.NoError(t, err)
45+
assert.Equal(t, []byte("pruned"), prevMode)
46+
})
47+
48+
t.Run("RevertToArchivalNotAllowed", func(t *testing.T) {
49+
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
50+
51+
// create archival node instance over a node that has been pruned before
52+
// (height 500)
53+
cfg := &Config{EnableService: false}
54+
lastPrunedHeight := uint64(500)
55+
56+
err := detectFirstRun(ctx, cfg, ds, lastPrunedHeight)
57+
assert.Error(t, err)
58+
assert.ErrorIs(t, err, fullavail.ErrDisallowRevertToArchival)
59+
})
60+
}

Diff for: nodebuilder/pruner/module.go

+64-53
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,15 @@ package pruner
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/ipfs/go-datastore"
87
logging "github.com/ipfs/go-log/v2"
98
"go.uber.org/fx"
109

1110
"github.com/celestiaorg/celestia-node/core"
12-
"github.com/celestiaorg/celestia-node/libs/fxutil"
1311
"github.com/celestiaorg/celestia-node/nodebuilder/node"
1412
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
1513
"github.com/celestiaorg/celestia-node/pruner"
16-
"github.com/celestiaorg/celestia-node/pruner/full"
1714
"github.com/celestiaorg/celestia-node/share/availability"
1815
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
1916
"github.com/celestiaorg/celestia-node/share/availability/light"
@@ -23,12 +20,6 @@ import (
2320
var log = logging.Logger("module/pruner")
2421

2522
func ConstructModule(tp node.Type, cfg *Config) fx.Option {
26-
baseComponents := fx.Options(
27-
fx.Supply(cfg),
28-
availWindow(tp, cfg.EnableService),
29-
advertiseArchival(tp, cfg),
30-
)
31-
3223
prunerService := fx.Options(
3324
fx.Provide(fx.Annotate(
3425
newPrunerService,
@@ -44,49 +35,53 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
4435
fx.Invoke(func(_ *pruner.Service) {}),
4536
)
4637

38+
baseComponents := fx.Options(
39+
fx.Supply(cfg),
40+
// TODO @renaynay: move this to share module construction
41+
fx.Supply(modshare.Window(availability.StorageWindow)),
42+
advertiseArchival(tp, cfg),
43+
prunerService,
44+
)
45+
4746
switch tp {
4847
case node.Light:
4948
// LNs enforce pruning by default
5049
return fx.Module("prune",
5150
baseComponents,
52-
prunerService,
5351
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
5452
// note this provide exists in pruner module to avoid cyclical imports
5553
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
5654
)
5755
case node.Full:
58-
if cfg.EnableService {
59-
return fx.Module("prune",
60-
baseComponents,
61-
prunerService,
62-
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
63-
fx.Supply([]fullavail.Option{}),
64-
)
56+
fullAvailOpts := make([]fullavail.Option, 0)
57+
58+
if !cfg.EnableService {
59+
// populate archival mode opts
60+
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
6561
}
62+
6663
return fx.Module("prune",
6764
baseComponents,
68-
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
69-
return pruner.DetectPreviousRun(ctx, ds)
70-
}),
71-
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
65+
fx.Supply(fullAvailOpts),
66+
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
67+
convertToPruned(),
7268
)
7369
case node.Bridge:
74-
if cfg.EnableService {
75-
return fx.Module("prune",
76-
baseComponents,
77-
prunerService,
78-
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
79-
fx.Supply([]fullavail.Option{}),
80-
fx.Supply([]core.Option{}),
81-
)
70+
coreOpts := make([]core.Option, 0)
71+
fullAvailOpts := make([]fullavail.Option, 0)
72+
73+
if !cfg.EnableService {
74+
// populate archival mode opts
75+
coreOpts = []core.Option{core.WithArchivalMode()}
76+
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
8277
}
78+
8379
return fx.Module("prune",
8480
baseComponents,
85-
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
86-
return pruner.DetectPreviousRun(ctx, ds)
87-
}),
88-
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
89-
fx.Supply([]core.Option{core.WithArchivalMode()}),
81+
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
82+
fx.Supply(coreOpts),
83+
fx.Supply(fullAvailOpts),
84+
convertToPruned(),
9085
)
9186
default:
9287
panic("unknown node type")
@@ -103,23 +98,39 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
10398
})
10499
}
105100

106-
func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
107-
switch tp {
108-
case node.Light:
109-
// light nodes are still subject to sampling within window
110-
// even if pruning is not enabled.
111-
return fx.Provide(func() modshare.Window {
112-
return modshare.Window(availability.StorageWindow)
113-
})
114-
case node.Full, node.Bridge:
115-
return fx.Provide(func() modshare.Window {
116-
if pruneEnabled {
117-
return modshare.Window(availability.StorageWindow)
118-
}
119-
// implicitly disable pruning by setting the window to 0
120-
return modshare.Window(time.Duration(0))
121-
})
122-
default:
123-
panic("unknown node type")
124-
}
101+
// convertToPruned checks if the node is being converted to an archival node
102+
// to a pruned node.
103+
func convertToPruned() fx.Option {
104+
return fx.Invoke(func(
105+
ctx context.Context,
106+
cfg *Config,
107+
ds datastore.Batching,
108+
p *pruner.Service,
109+
) error {
110+
lastPrunedHeight, err := p.LastPruned(ctx)
111+
if err != nil {
112+
return err
113+
}
114+
115+
err = detectFirstRun(ctx, cfg, ds, lastPrunedHeight)
116+
if err != nil {
117+
return err
118+
}
119+
120+
isArchival := !cfg.EnableService
121+
convert, err := fullavail.ConvertFromArchivalToPruned(ctx, ds, isArchival)
122+
if err != nil {
123+
return err
124+
}
125+
126+
// if we convert the node from archival to pruned, we need to reset the checkpoint
127+
// to ensure the node goes back and deletes *all* blocks older than the
128+
// availability window, as archival "pruning" only trims the .q4 file,
129+
// but retains the ODS.
130+
if convert {
131+
return p.ResetCheckpoint(ctx)
132+
}
133+
134+
return nil
135+
})
125136
}

Diff for: nodebuilder/share/module.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,13 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
239239
)
240240
case node.Bridge, node.Full:
241241
return fx.Options(
242-
fx.Provide(full.NewShareAvailability),
242+
fx.Provide(func(
243+
s *store.Store,
244+
getter shwap.Getter,
245+
opts []full.Option,
246+
) *full.ShareAvailability {
247+
return full.NewShareAvailability(s, getter, opts...)
248+
}),
243249
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
244250
return avail
245251
}),

0 commit comments

Comments
 (0)