@@ -2,9 +2,13 @@ package iavlx
22
33import (
44 "fmt"
5- "log/slog "
5+ "io "
66 "sync"
77 "sync/atomic"
8+
9+ "cosmossdk.io/log"
10+ pruningtypes "cosmossdk.io/store/pruning/types"
11+ storetypes "cosmossdk.io/store/types"
812)
913
1014type CommitTree struct {
@@ -25,10 +29,186 @@ type CommitTree struct {
2529
2630 pendingOrphans [][]NodeID
2731
28- logger * slog.Logger
32+ logger log.Logger
33+
34+ commitCtx * commitContext
35+ lastCommitId storetypes.CommitID
36+ workingCommitId storetypes.CommitID
37+ }
38+
39+ func (c * CommitTree ) Root () * NodePointer {
40+ return c .root
41+ }
42+
43+ func (c * CommitTree ) ApplyChanges (origRoot , newRoot * NodePointer , updateBatch KVUpdateBatch ) error {
44+ // TODO check channel errors
45+ c .writeMutex .Lock ()
46+ defer c .writeMutex .Unlock ()
47+
48+ if updateBatch .Version != c .stagedVersion () {
49+ return fmt .Errorf ("tree version %d does not match staged version %d" , updateBatch .Version , c .stagedVersion ())
50+ }
51+ if origRoot != c .root {
52+ // TODO find a way to apply the changes incrementally when roots don't match
53+ return fmt .Errorf ("tree original root does not match current root" )
54+ }
55+ c .root = newRoot
56+ c .pendingOrphans = append (c .pendingOrphans , updateBatch .Orphans ... )
57+
58+ if c .writeWal {
59+ c .walChan <- updateBatch .Updates
60+ }
61+
62+ // TODO prevent further writes to the branch tree
63+
64+ return nil
65+ }
66+
67+ func (c * CommitTree ) Commit () storetypes.CommitID {
68+ commitId , err := c .commit ()
69+ if err != nil {
70+ panic (fmt .Sprintf ("failed to commit: %v" , err ))
71+ }
72+ return commitId
73+ }
74+
75+ func (c * CommitTree ) commit () (storetypes.CommitID , error ) {
76+ c .WorkingHash ()
77+ commitId := c .workingCommitId
78+
79+ stagedVersion := c .stagedVersion ()
80+ if c .writeWal {
81+ // wait for WAL write to complete
82+ err := <- c .walDone
83+ if err != nil {
84+ return storetypes.CommitID {}, err
85+ }
86+
87+ err = c .store .WriteWALCommit (stagedVersion )
88+ if err != nil {
89+ return storetypes.CommitID {}, err
90+ }
91+
92+ c .reinitWalProc ()
93+ }
94+
95+ err := c .store .SaveRoot (stagedVersion , c .root , c .commitCtx .leafNodeIdx , c .commitCtx .branchNodeIdx )
96+ if err != nil {
97+ return storetypes.CommitID {}, err
98+ }
99+
100+ c .store .MarkOrphans (stagedVersion , c .pendingOrphans )
101+ c .pendingOrphans = nil
102+
103+ // start eviction if needed
104+ c .startEvict (c .store .SavedVersion ())
105+
106+ // cache the committed tree as the latest version
107+ c .latest .Store (c .root )
108+ c .version ++
109+ c .lastCommitId = commitId
110+ c .commitCtx = nil
111+
112+ return commitId , nil
113+ }
114+
115+ func (c * CommitTree ) LastCommitID () storetypes.CommitID {
116+ //TODO implement me
117+ panic ("implement me" )
118+ }
119+
120+ func (c * CommitTree ) WorkingHash () []byte {
121+ if c .commitCtx != nil {
122+ return c .workingCommitId .Hash
123+ }
124+
125+ c .writeMutex .Lock ()
126+ defer c .writeMutex .Unlock ()
127+
128+ if c .writeWal {
129+ close (c .walChan )
130+ }
131+
132+ var hash []byte
133+ savedVersion := c .store .SavedVersion ()
134+ stagedVersion := c .stagedVersion ()
135+ c .commitCtx = & commitContext {
136+ version : stagedVersion ,
137+ savedVersion : savedVersion ,
138+ }
139+ if c .root == nil {
140+ hash = emptyHash
141+ } else {
142+ // compute hash and assign node IDs
143+ var err error
144+ hash , err = commitTraverse (c .commitCtx , c .root , 0 )
145+ if err != nil {
146+ panic (fmt .Sprintf ("failed to compute working hash: %v" , err ))
147+ }
148+ }
149+
150+ c .workingCommitId = storetypes.CommitID {
151+ Version : int64 (stagedVersion ),
152+ Hash : hash ,
153+ }
154+ return hash
155+ }
156+
157+ func (c * CommitTree ) SetPruning (pruningtypes.PruningOptions ) {
158+ //TODO implement me
159+ panic ("implement me" )
160+ }
161+
162+ func (c * CommitTree ) GetPruning () pruningtypes.PruningOptions {
163+ //TODO implement me
164+ panic ("implement me" )
165+ }
166+
167+ func (c * CommitTree ) GetStoreType () storetypes.StoreType {
168+ //TODO implement me
169+ panic ("implement me" )
29170}
30171
31- func NewCommitTree (dir string , opts Options , logger * slog.Logger ) (* CommitTree , error ) {
172+ func (c * CommitTree ) CacheWrap () storetypes.CacheWrap {
173+ return NewTree (c , c .stagedVersion (), c .zeroCopy )
174+ }
175+
176+ func (c * CommitTree ) CacheWrapWithTrace (w io.Writer , tc storetypes.TraceContext ) storetypes.CacheWrap {
177+ // TODO support tracing
178+ return c .CacheWrap ()
179+ }
180+
181+ func (c * CommitTree ) Get (key []byte ) []byte {
182+ //TODO implement me
183+ panic ("implement me" )
184+ }
185+
186+ func (c * CommitTree ) Has (key []byte ) bool {
187+ //TODO implement me
188+ panic ("implement me" )
189+ }
190+
191+ func (c * CommitTree ) Set (key , value []byte ) {
192+ //TODO implement me
193+ panic ("implement me" )
194+ }
195+
196+ func (c * CommitTree ) Delete (key []byte ) {
197+ //TODO implement me
198+ panic ("implement me" )
199+ }
200+
201+ func (c * CommitTree ) Iterator (start , end []byte ) storetypes.Iterator {
202+ //TODO implement me
203+ panic ("implement me" )
204+ }
205+
206+ func (c * CommitTree ) ReverseIterator (start , end []byte ) storetypes.Iterator {
207+ //TODO implement me
208+ panic ("implement me" )
209+ }
210+
211+ func NewCommitTree (dir string , opts Options , logger log.Logger ) (* CommitTree , error ) {
32212 ts , err := NewTreeStore (dir , opts , logger )
33213 if err != nil {
34214 return nil , fmt .Errorf ("failed to create tree store: %w" , err )
@@ -74,35 +254,6 @@ func (c *CommitTree) reinitWalProc() {
74254 }()
75255}
76256
77- func (c * CommitTree ) Branch () * Tree {
78- return NewTree (c .root , NewKVUpdateBatch (c .stagedVersion ()), c .zeroCopy )
79- }
80-
81- func (c * CommitTree ) Apply (tree * Tree ) error {
82- // TODO check channel errors
83- c .writeMutex .Lock ()
84- defer c .writeMutex .Unlock ()
85-
86- if tree .updateBatch .Version != c .stagedVersion () {
87- return fmt .Errorf ("tree version %d does not match staged version %d" , tree .updateBatch .Version , c .stagedVersion ())
88- }
89- if tree .origRoot != c .root {
90- // TODO find a way to apply the changes incrementally when roots don't match
91- return fmt .Errorf ("tree original root does not match current root" )
92- }
93- c .root = tree .root
94- batch := tree .updateBatch
95- c .pendingOrphans = append (c .pendingOrphans , batch .Orphans ... )
96-
97- if c .writeWal {
98- c .walChan <- batch .Updates
99- }
100-
101- // TODO prevent further writes to the branch tree
102-
103- return nil
104- }
105-
106257func (c * CommitTree ) startEvict (evictVersion uint32 ) {
107258 if c .evictorRunning {
108259 // eviction in progress
@@ -130,65 +281,6 @@ func (c *CommitTree) startEvict(evictVersion uint32) {
130281 }()
131282}
132283
133- func (c * CommitTree ) Commit () ([]byte , error ) {
134- c .writeMutex .Lock ()
135- defer c .writeMutex .Unlock ()
136-
137- if c .writeWal {
138- close (c .walChan )
139- }
140-
141- var hash []byte
142- savedVersion := c .store .SavedVersion ()
143- stagedVersion := c .stagedVersion ()
144- commitCtx := & commitContext {
145- version : stagedVersion ,
146- savedVersion : savedVersion ,
147- }
148- if c .root == nil {
149- hash = emptyHash
150- } else {
151- // compute hash and assign node IDs
152- var err error
153- hash , err = commitTraverse (commitCtx , c .root , 0 )
154- if err != nil {
155- return nil , err
156- }
157- }
158-
159- if c .writeWal {
160- // wait for WAL write to complete
161- err := <- c .walDone
162- if err != nil {
163- return nil , err
164- }
165-
166- err = c .store .WriteWALCommit (stagedVersion )
167- if err != nil {
168- return nil , err
169- }
170-
171- c .reinitWalProc ()
172- }
173-
174- err := c .store .SaveRoot (stagedVersion , c .root , commitCtx .leafNodeIdx , commitCtx .branchNodeIdx )
175- if err != nil {
176- return nil , err
177- }
178-
179- c .store .MarkOrphans (stagedVersion , c .pendingOrphans )
180- c .pendingOrphans = nil
181-
182- // start eviction if needed
183- c .startEvict (savedVersion )
184-
185- // cache the committed tree as the latest version
186- c .latest .Store (c .root )
187- c .version ++
188-
189- return hash , nil
190- }
191-
192284func (c * CommitTree ) Close () error {
193285 if c .walChan != nil {
194286 close (c .walChan )
@@ -268,3 +360,6 @@ func evictTraverse(np *NodePointer, depth, evictionDepth uint8, evictVersion uin
268360 count += evictTraverse (memNode .right , depth + 1 , evictionDepth , evictVersion )
269361 return
270362}
363+
364+ var _ storetypes.CommitKVStore = & CommitTree {}
365+ var _ parentTree = & CommitTree {}
0 commit comments