Skip to content

Commit 1f6190c

Browse files
committed
Include AllowEmpty option and IsEmpty method
This commit includes the AllowEmpty option which allows for deleting all entries from the log using the TruncateFront and TruncateBack methods. Using AllowEmpty changes the behavior of the log as it relates to FirstIndex and LastIndex in the following ways: - An empty log will now always have the FirstIndex be equal to LastIndex+1. Witout AllowEmpty, FirstIndex can never be greater than LastIndex. - For a newly created log that has no entries, FirstIndex() and LastIndex() return 1 and 0, respectively. Without AllowEmpty, both return 0. Also in this commit: - IsEmpty method, which returns true if the log has no entries - More unit tests - Various code cleanup See #31
1 parent 850d828 commit 1f6190c

2 files changed

Lines changed: 530 additions & 140 deletions

File tree

wal.go

Lines changed: 123 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ var (
4343
// may be returned when the caller is attempting to remove *all* entries;
4444
// The log requires that at least one entry exists following a truncate.
4545
ErrOutOfRange = errors.New("out of range")
46+
47+
// ErrEmptyLog is returned by Open() when the `AllowEmpty` option was not
48+
// provided and log has been emptied due to the use of TruncateFront() or
49+
// TruncateBack().
50+
ErrEmptyLog = errors.New("empty log")
4651
)
4752

4853
// LogFormat is the format of the log files.
@@ -63,19 +68,32 @@ type Options struct {
6368
// log at risk of data loss when there's a server crash.
6469
NoSync bool
6570
// SegmentSize of each segment. This is just a target value, actual size
66-
// may differ. Default is 20 MB.
71+
// may differ. Default 20 MB
6772
SegmentSize int
68-
// LogFormat is the format of the log files. Default is Binary.
73+
// LogFormat is the format of the log files. Default Binary
6974
LogFormat LogFormat
7075
// SegmentCacheSize is the maximum number of segments that will be held in
7176
// memory for caching. Increasing this value may enhance performance for
72-
// concurrent read operations. Default is 1
77+
// concurrent read operations. Default 1
7378
SegmentCacheSize int
7479
// NoCopy allows for the Read() operation to return the raw underlying data
7580
// slice. This is an optimization to help minimize allocations. When this
7681
// option is set, do not modify the returned data because it may affect
7782
// other Read calls. Default false
7883
NoCopy bool
84+
// AllowEmpty allows for a log to have all entries removed through the use
85+
// of TruncateFront() or TruncateBack(). Otherwise without this option,
86+
// at least one entry must always remain following a truncate operation.
87+
// Default false
88+
//
89+
// Warning: using this option changes the behavior of the log in the
90+
// following ways:
91+
// - An empty log will always have the FirstIndex() be equal to
92+
// LastIndex()+1.
93+
// - For a newly created log that has no entries, FirstIndex() and
94+
// LastIndex() return 1 and 0, respectively.
95+
// Without AllowEmpty, both return 0.
96+
AllowEmpty bool
7997
// Perms represents the datafiles modes and permission bits
8098
DirPerms os.FileMode
8199
FilePerms os.FileMode
@@ -84,10 +102,11 @@ type Options struct {
84102
// DefaultOptions for Open().
85103
var DefaultOptions = &Options{
86104
NoSync: false, // Fsync after every write
87-
SegmentSize: 20971520, // 20 MB log segment files.
88-
LogFormat: Binary, // Binary format is small and fast.
105+
SegmentSize: 20971520, // 20 MB log segment files
106+
LogFormat: Binary, // Binary format is small and fast
89107
SegmentCacheSize: 2, // Number of cached in-memory segments
90-
NoCopy: false, // Make a new copy of data for every Read call.
108+
NoCopy: false, // Make a new copy of data for every Read call
109+
AllowEmpty: false, // Do not allow empty log. 1+ entries required
91110
DirPerms: 0750, // Permissions for the created directories
92111
FilePerms: 0640, // Permissions for the created data files
93112
}
@@ -210,7 +229,8 @@ func (l *Log) load() error {
210229
})
211230
l.firstIndex = 1
212231
l.lastIndex = 0
213-
l.sfile, err = os.OpenFile(l.segments[0].path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, l.opts.FilePerms)
232+
l.sfile, err = os.OpenFile(l.segments[0].path,
233+
os.O_CREATE|os.O_RDWR|os.O_TRUNC, l.opts.FilePerms)
214234
return err
215235
}
216236
// Open existing log. Clean up log if START of END segments exists.
@@ -274,6 +294,9 @@ func (l *Log) load() error {
274294
return err
275295
}
276296
l.lastIndex = lseg.index + uint64(len(lseg.epos)) - 1
297+
if l.firstIndex > l.lastIndex && !l.opts.AllowEmpty {
298+
return ErrEmptyLog
299+
}
277300
return nil
278301
}
279302

@@ -343,7 +366,8 @@ func (l *Log) cycle() error {
343366
path: filepath.Join(l.path, segmentName(l.lastIndex+1)),
344367
}
345368
var err error
346-
l.sfile, err = os.OpenFile(s.path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, l.opts.FilePerms)
369+
l.sfile, err = os.OpenFile(s.path, os.O_CREATE|os.O_RDWR|os.O_TRUNC,
370+
l.opts.FilePerms)
347371
if err != nil {
348372
return err
349373
}
@@ -481,8 +505,10 @@ func (l *Log) writeBatch(b *Batch) error {
481505
return nil
482506
}
483507

484-
// FirstIndex returns the index of the next entry to read in the log.
485-
// It points to the next future index if the log is currently empty.
508+
// FirstIndex returns the index of the first entry in the log.
509+
// Returns zero when log has no entries.
510+
// When using the `AllowEmpty` option and when the log is empty, this will
511+
// return LastIndex+1, which is the next future index.
486512
func (l *Log) FirstIndex() (index uint64, err error) {
487513
l.mu.RLock()
488514
defer l.mu.RUnlock()
@@ -491,17 +517,16 @@ func (l *Log) FirstIndex() (index uint64, err error) {
491517
} else if l.closed {
492518
return 0, ErrClosed
493519
}
494-
// No longer check the lastIndex for zero because we allow empty logs since
495-
// #31 was merged.
496-
// https://github.com/tidwall/wal/pull/31
497-
//if l.lastIndex == 0 {
498-
// return 0, nil
499-
//}
520+
if !l.opts.AllowEmpty && l.lastIndex == 0 {
521+
return 0, nil
522+
}
500523
return l.firstIndex, nil
501524
}
502525

503-
// LastIndex returns the index of the last entry has been written to the log.
504-
// It points to the last deleted index if the log is currently empty.
526+
// LastIndex returns the index of the last entry in the log.
527+
// Returns zero when log has no entries.
528+
// When using the `AllowEmpty` option and when the log is empty, this will
529+
// return FirstIndex()-1, which is the last known deleted index.
505530
func (l *Log) LastIndex() (index uint64, err error) {
506531
l.mu.RLock()
507532
defer l.mu.RUnlock()
@@ -510,12 +535,9 @@ func (l *Log) LastIndex() (index uint64, err error) {
510535
} else if l.closed {
511536
return 0, ErrClosed
512537
}
513-
// No longer check the lastIndex for zero because we allow empty logs since
514-
// #31 was merged.
515-
// https://github.com/tidwall/wal/pull/31
516-
//if l.lastIndex == 0 {
517-
// return 0, nil
518-
//}
538+
if !l.opts.AllowEmpty && l.firstIndex == 0 {
539+
return 0, nil
540+
}
519541
return l.lastIndex, nil
520542
}
521543

@@ -629,7 +651,7 @@ func (l *Log) Read(index uint64) (data []byte, err error) {
629651
} else if l.closed {
630652
return nil, ErrClosed
631653
}
632-
if index == 0 || index < l.firstIndex || index > l.lastIndex {
654+
if index < l.firstIndex || index > l.lastIndex {
633655
return nil, ErrNotFound
634656
}
635657
s, err := l.loadSegment(index)
@@ -677,7 +699,9 @@ func readJSON(edata []byte) ([]byte, error) {
677699
return data, nil
678700
}
679701

680-
// ClearCache clears the segment cache
702+
// ClearCache clears the segment cache.
703+
// This only frees internal buffers and the LRU cache and does not modify the
704+
// contents of the log.
681705
func (l *Log) ClearCache() error {
682706
l.mu.Lock()
683707
defer l.mu.Unlock()
@@ -689,6 +713,7 @@ func (l *Log) ClearCache() error {
689713
l.clearCache()
690714
return nil
691715
}
716+
692717
func (l *Log) clearCache() {
693718
l.scache.Range(func(_, v interface{}) bool {
694719
s := v.(*segment)
@@ -700,10 +725,40 @@ func (l *Log) clearCache() {
700725
l.scache.Resize(l.opts.SegmentCacheSize)
701726
}
702727

728+
// atomicWrite performs an temp write + rename to ensure the file writing is
729+
// and atomic operation. One os.WriteFile alone is not good enough.
730+
func (l *Log) atomicWrite(name string, data []byte) error {
731+
// Create a TEMP file
732+
tempName := name + ".TEMP"
733+
defer os.RemoveAll(tempName)
734+
if err := func() error {
735+
f, err := os.OpenFile(tempName, os.O_CREATE|os.O_RDWR|os.O_TRUNC,
736+
l.opts.FilePerms)
737+
if err != nil {
738+
return err
739+
}
740+
defer f.Close()
741+
if _, err := f.Write(data); err != nil {
742+
return err
743+
}
744+
if err := f.Sync(); err != nil {
745+
return err
746+
}
747+
return f.Close()
748+
}(); err != nil {
749+
return err
750+
}
751+
// Rename the TEMP file to final name
752+
return os.Rename(tempName, name)
753+
}
754+
703755
// TruncateFront truncates the front of the log by removing all entries that
704-
// are before the provided `index`. In other words the entry at
705-
// `index` becomes the first entry in the log.
706-
// If `index` equals to `LastIndex()+1`, all entries will be truncated.
756+
// are before the provided `index`. In other words the entry at `index` becomes
757+
// the first entry in the log.
758+
//
759+
// The `AllowEmpty` option may be used to allow for removing all entries in the
760+
// log by providing `LastIndex+1` as the index. Otherwise without `AllowEmpty`,
761+
// at least one entry must always remain following a truncate.
707762
func (l *Log) TruncateFront(index uint64) error {
708763
l.mu.Lock()
709764
defer l.mu.Unlock()
@@ -719,6 +774,9 @@ func (l *Log) truncateFront(index uint64) (err error) {
719774
if index < l.firstIndex || index > l.lastIndex+1 {
720775
return ErrOutOfRange
721776
}
777+
if !l.opts.AllowEmpty && index == l.lastIndex+1 {
778+
return ErrOutOfRange
779+
}
722780
if index == l.firstIndex {
723781
// nothing to truncate
724782
return nil
@@ -740,28 +798,10 @@ func (l *Log) truncateFront(index uint64) (err error) {
740798
epos := s.epos[index-s.index:]
741799
ebuf = s.ebuf[epos[0].pos:]
742800
}
743-
// Create a temp file contains the truncated segment.
744-
tempName := filepath.Join(l.path, "TEMP")
745-
if err = func() error {
746-
f, err := os.OpenFile(tempName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, l.opts.FilePerms)
747-
if err != nil {
748-
return err
749-
}
750-
defer f.Close()
751-
if _, err := f.Write(ebuf); err != nil {
752-
return err
753-
}
754-
if err := f.Sync(); err != nil {
755-
return err
756-
}
757-
return f.Close()
758-
}(); err != nil {
759-
return fmt.Errorf("failed to create temp file for new start segment: %w", err)
760-
}
761-
// Rename the TEMP file to it's START file name.
801+
// Create a START file contains the truncated segment.
762802
startName := filepath.Join(l.path, segmentName(index)+".START")
763-
if err = os.Rename(tempName, startName); err != nil {
764-
return err
803+
if err = l.atomicWrite(startName, ebuf); err != nil {
804+
return fmt.Errorf("failed to create start segment: %w", err)
765805
}
766806
// The log was truncated but still needs some file cleanup. Any errors
767807
// following this message will not cause an on-disk data ocorruption, but
@@ -771,6 +811,8 @@ func (l *Log) truncateFront(index uint64) (err error) {
771811
defer func() {
772812
if v := recover(); v != nil {
773813
err = ErrCorrupt
814+
}
815+
if err != nil {
774816
l.corrupt = true
775817
}
776818
}()
@@ -795,7 +837,8 @@ func (l *Log) truncateFront(index uint64) (err error) {
795837
s.index = index
796838
if segIdx == len(l.segments)-1 {
797839
// Reopen the tail segment file
798-
if l.sfile, err = os.OpenFile(newName, os.O_WRONLY, l.opts.FilePerms); err != nil {
840+
l.sfile, err = os.OpenFile(newName, os.O_WRONLY, l.opts.FilePerms)
841+
if err != nil {
799842
return err
800843
}
801844
var n int64
@@ -818,9 +861,12 @@ func (l *Log) truncateFront(index uint64) (err error) {
818861
}
819862

820863
// TruncateBack truncates the back of the log by removing all entries that
821-
// are after the provided `index`. In other words the entry at `index`
822-
// becomes the last entry in the log.
823-
// If `index` equals to `FirstIndex()-1`, all entries will be truncated.
864+
// are after the provided `index`. In other words the entry at `index` becomes
865+
// the last entry in the log.
866+
//
867+
// The `AllowEmpty` option may be used to allow for removing all entries in the
868+
// log by providing `FirstIndex()-1` as the index. Otherwise without
869+
// `AllowEmpty`, at least one entry must always remain following a truncate.
824870
func (l *Log) TruncateBack(index uint64) error {
825871
l.mu.Lock()
826872
defer l.mu.Unlock()
@@ -836,6 +882,9 @@ func (l *Log) truncateBack(index uint64) (err error) {
836882
if index < l.firstIndex-1 || index > l.lastIndex {
837883
return ErrOutOfRange
838884
}
885+
if !l.opts.AllowEmpty && index == l.firstIndex-1 {
886+
return ErrOutOfRange
887+
}
839888
if index == l.lastIndex {
840889
// nothing to truncate
841890
return nil
@@ -857,28 +906,10 @@ func (l *Log) truncateBack(index uint64) (err error) {
857906
epos := s.epos[:index-s.index+1]
858907
ebuf = s.ebuf[:epos[len(epos)-1].end]
859908
}
860-
// Create a temp file contains the truncated segment.
861-
tempName := filepath.Join(l.path, "TEMP")
862-
if err = func() error {
863-
f, err := os.OpenFile(tempName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, l.opts.FilePerms)
864-
if err != nil {
865-
return err
866-
}
867-
defer f.Close()
868-
if _, err := f.Write(ebuf); err != nil {
869-
return err
870-
}
871-
if err := f.Sync(); err != nil {
872-
return err
873-
}
874-
return f.Close()
875-
}(); err != nil {
876-
return fmt.Errorf("failed to create temp file for new end segment: %w", err)
877-
}
878-
// Rename the TEMP file to it's END file name.
909+
// Create an END file contains the truncated segment.
879910
endName := filepath.Join(l.path, segmentName(s.index)+".END")
880-
if err = os.Rename(tempName, endName); err != nil {
881-
return err
911+
if err = l.atomicWrite(endName, ebuf); err != nil {
912+
return fmt.Errorf("failed to create end segment: %w", err)
882913
}
883914
// The log was truncated but still needs some file cleanup. Any errors
884915
// following this message will not cause an on-disk data ocorruption, but
@@ -888,6 +919,8 @@ func (l *Log) truncateBack(index uint64) (err error) {
888919
defer func() {
889920
if v := recover(); v != nil {
890921
err = ErrCorrupt
922+
}
923+
if err != nil {
891924
l.corrupt = true
892925
}
893926
}()
@@ -908,7 +941,8 @@ func (l *Log) truncateBack(index uint64) (err error) {
908941
return err
909942
}
910943
// Reopen the tail segment file
911-
if l.sfile, err = os.OpenFile(newName, os.O_WRONLY, l.opts.FilePerms); err != nil {
944+
l.sfile, err = os.OpenFile(newName, os.O_WRONLY, l.opts.FilePerms)
945+
if err != nil {
912946
return err
913947
}
914948
var n int64
@@ -942,3 +976,16 @@ func (l *Log) Sync() error {
942976
}
943977
return l.sfile.Sync()
944978
}
979+
980+
// IsEmpty returns true if there are no entries in the log.
981+
func (l *Log) IsEmpty() (bool, error) {
982+
l.mu.Lock()
983+
defer l.mu.Unlock()
984+
if l.corrupt {
985+
return false, ErrCorrupt
986+
} else if l.closed {
987+
return false, ErrClosed
988+
}
989+
return (l.firstIndex == 0 && l.lastIndex == 0) ||
990+
l.firstIndex > l.lastIndex, nil
991+
}

0 commit comments

Comments
 (0)