55 "errors"
66 "fmt"
77 "os"
8+ "runtime/debug"
89 "time"
910
1011 "github.com/k3s-io/kine/pkg/endpoint"
@@ -16,16 +17,14 @@ import (
1617 "go.etcd.io/etcd/client/pkg/v3/logutil"
1718 clientv3 "go.etcd.io/etcd/client/v3"
1819 "go.etcd.io/etcd/server/v3/config"
19- "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
20- "go.etcd.io/etcd/server/v3/etcdserver/cindex"
2120 etcderrors "go.etcd.io/etcd/server/v3/etcdserver/errors"
2221 "go.etcd.io/etcd/server/v3/lease"
23- "go.etcd.io/etcd/server/v3/storage"
2422 "go.etcd.io/etcd/server/v3/storage/backend"
2523 "go.etcd.io/etcd/server/v3/storage/mvcc"
2624 "go.etcd.io/etcd/server/v3/storage/schema"
27- "go.etcd.io/etcd/server/v3/storage/wal "
25+ "go.uber.org/zap "
2826 "go.uber.org/zap/zapcore"
27+ "google.golang.org/grpc"
2928)
3029
3130// ReadCloser is a generic wrapper around a MVCC store that provides only read/close functions
@@ -62,9 +61,14 @@ func NewRemoteStore(config endpoint.ETCDConfig) (*RemoteStore, error) {
6261 if err != nil {
6362 return nil , err
6463 }
64+ logger = logger .Named ("k3s.remotestore" )
65+
66+ logrus .Infof ("Opening etcd client connection with endpoints %v" , config .Endpoints )
67+
6568 c , err := clientv3 .New (clientv3.Config {
6669 Endpoints : config .Endpoints ,
6770 DialTimeout : 5 * time .Second ,
71+ DialOptions : []grpc.DialOption {grpc .WithBlock (), grpc .FailOnNonTempDialError (true )},
6872 Logger : logger ,
6973 TLS : tlsConfig ,
7074 })
@@ -163,7 +167,24 @@ func NewTemporaryStore(dataDir string) (*TemporaryStore, error) {
163167 return nil , err
164168 }
165169
166- if err := copy .Copy (dataDir , tempDir , copy.Options {PreserveOwner : true }); err != nil {
170+ // only copy the bbolt backend database; we don't need the WAL, legacy v2
171+ // store snapshots, config file, or anything else.
172+ // ref: https://etcd.io/docs/v3.6/learning/persistent-storage-files/#long-leaving-files
173+ copyOpts := copy.Options {
174+ PreserveOwner : true ,
175+ PreserveTimes : true ,
176+ NumOfWorkers : 0 ,
177+ Sync : true ,
178+ Skip : func (srcinfo os.FileInfo , src , dest string ) (bool , error ) {
179+ switch srcinfo .Name () {
180+ case "member" , "snap" , "db" :
181+ return false , nil
182+ default :
183+ return true , nil
184+ }
185+ },
186+ }
187+ if err := copy .Copy (dataDir , tempDir , copyOpts ); err != nil {
167188 return nil , err
168189 }
169190
@@ -198,69 +219,83 @@ type Store struct {
198219 be backend.Backend
199220}
200221
201- func NewStore (dataDir string ) (* Store , error ) {
202- var currentIndex , latestIndex uint64
222+ func NewStore (dataDir string ) (store * Store , rerr error ) {
223+ s := & Store {}
224+
203225 logger , err := logutil .CreateDefaultZapLogger (zapcore .InfoLevel )
204226 if err != nil {
205227 return nil , err
206228 }
207229
230+ // etcd relies on panic/fatal errors to trigger process exit; we need to
231+ // handle it properly by recovering and returning an error.
232+ logger = logger .Named ("k3s.store" ).WithOptions (
233+ zap .WithPanicHook (zapcore .WriteThenPanic ),
234+ zap .WithFatalHook (zapcore .WriteThenPanic ),
235+ )
236+
237+ // recover from zap panics and ensure kv and backened are closed on error
238+ defer func () {
239+ if err := recover (); err != nil {
240+ msg := fmt .Sprintf ("panic: %v" , err )
241+ if logrus .IsLevelEnabled (logrus .DebugLevel ) {
242+ msg += " at " + string (debug .Stack ())
243+ }
244+ rerr = errors .New (msg )
245+ }
246+ if rerr != nil && s != nil {
247+ go s .Close ()
248+ }
249+ }()
250+
208251 cfg := config.ServerConfig {Logger : logger , DataDir : dataDir }
209252 path := cfg .BackendPath ()
210253
211- // need to check for backend path ourselves, as backend.New just logs a panic
212- // via zap if it doesn't exist, which isn't fatal .
254+ // need to check for backend path ourselves, as backend.New just creates
255+ // a new empty database if the file does not exist or is empty .
213256 if _ , err := os .Stat (path ); err != nil {
214257 return nil , pkgerrors .WithMessage (err , "failed to stat MVCC KV store backend path" )
215258 }
216259
217- logrus .Infof ("Opening etcd MVCC KV store at %s" , path )
260+ logrus .Infof ("Opening etcd MVCC KV backend database at %s" , path )
218261
219262 // open backend database
220263 bcfg := backend .DefaultBackendConfig (logger )
221264 bcfg .Path = path
222265 bcfg .UnsafeNoFsync = true
223- bcfg .BatchInterval = 0
224- bcfg .BatchLimit = 0
225- be := backend .New (bcfg )
226-
227- // get current index from backend
228- currentIndex , _ = schema .ReadConsistentIndex (be .ReadTx ())
229-
230- // list snapshots from WAL dir
231- walSnaps , err := wal .ValidSnapshotEntries (cfg .Logger , cfg .WALDir ())
232- if err != nil {
233- return nil , err
266+ bcfg .BatchInterval = time .Hour
267+ bcfg .BatchLimit = 100000
268+
269+ // try to open the bbolt database; this may unrecoverably panic from inside
270+ // the bbolt freelist goroutine if the database is in an inconsistent state.
271+ s .be = backend .New (bcfg )
272+ if s .be == nil {
273+ return nil , errors .New ("failed to open database" )
234274 }
235275
236- // find latest available snapshot index
237- ss := snap .New (logger , cfg .SnapDir ())
238- snapshot , err := ss .LoadNewestAvailable (walSnaps )
239- if err != nil && ! errors .Is (err , snap .ErrNoSnapshot ) {
240- return nil , err
241- }
242- if snapshot != nil {
243- latestIndex = snapshot .Metadata .Index
276+ // try to get current index from backend; this may fail if the bbolt database
277+ // was opened successfully but is in an inconsistent state.
278+ if currentIndex , _ := schema .ReadConsistentIndex (s .be .ReadTx ()); currentIndex == 0 {
279+ return nil , errors .New ("failed to read consistent index" )
244280 }
245281
246- // restore from snapshot if available
247- if latestIndex > currentIndex {
248- logrus .Warnf ("MVCC database index %d is less than latest snapshot index %d" , currentIndex , latestIndex )
249- path , err := ss .DBFilePath (latestIndex )
250- if err != nil {
251- logrus .Warnf ("MVCC database for snapshot index %d not available; data may be stale" , latestIndex )
252- } else {
253- logrus .Infof ("MVCC database restoring snapshot index %d from %s" , latestIndex , path )
254- be , err = storage .RecoverSnapshotBackend (cfg , be , * snapshot , true , storage .NewBackendHooks (cfg .Logger , cindex .NewConsistentIndex (nil )))
255- if err != nil {
256- be .Close ()
257- return nil , err
258- }
259- }
260- }
282+ // We do not bother checking the latest snapshot index from the WAL or attempting to
283+ // restore from a snapshot, as v3 store snapshots are only created when replicas are
284+ // lagging and the leader sends them a fresh copy of the bbolt database - and are
285+ // therefore highly unlikely to exist. The .snap files in the snap dir are for the
286+ // legacy v2 store, and are of no use.
287+ //
288+ // ref: https://etcd.io/docs/v3.6/learning/persistent-storage-files/#long-leaving-files
289+ // > Note: Periodic snapshots generated on each replica are only emitted in the form of
290+ // > *.snap file (not snap.db file). So there is no guarantee the most recent snapshot (in
291+ // > WAL log) has the *.snap.db file. But in such a case the backend (snap/db) is expected
292+ // > to be newer than the snapshot.
293+
294+ s .kv = mvcc .NewStore (logger , s .be , & lease.FakeLessor {}, mvcc.StoreConfig {})
295+ logrus .Info ("Opened etcd MVCC KV store" )
261296
262297 // nb: closing the kv store does not implicitly close its backend; the backend must be closed separately
263- return & Store { kv : mvcc . NewStore ( cfg . Logger , be , & lease. FakeLessor {}, mvcc. StoreConfig {}), be : be } , nil
298+ return s , nil
264299}
265300
266301func (s * Store ) Close () error {
0 commit comments