Skip to content

Commit d6393e1

Browse files
committed
optimize checksumming entire database
1 parent cd75b15 commit d6393e1

File tree

3 files changed

+266
-81
lines changed

3 files changed

+266
-81
lines changed

checksum.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package ltx
2+
3+
import (
4+
"encoding/binary"
5+
"encoding/json"
6+
"fmt"
7+
"hash"
8+
"hash/crc64"
9+
"io"
10+
"os"
11+
"runtime"
12+
"strconv"
13+
"sync"
14+
)
15+
16+
// Checksum represents an LTX checksum.
17+
type Checksum uint64
18+
19+
func ChecksumPages(dbPath string, pageSize, nPages, nWorkers uint32) ([]Checksum, error) {
20+
checksums := make([]Checksum, nPages)
21+
22+
if nWorkers == 0 {
23+
nWorkers = uint32(runtime.NumCPU() * 3 / 2)
24+
}
25+
26+
if nWorkers == 1 {
27+
lastPageNo, err := checksumPagesSerial(dbPath, 1, nPages, int64(pageSize), checksums)
28+
return checksums[:lastPageNo], err
29+
}
30+
31+
perWorker := nPages / nWorkers
32+
if nPages%nWorkers != 0 {
33+
perWorker++
34+
}
35+
36+
var (
37+
wg sync.WaitGroup
38+
rets = make([]uint32, nWorkers)
39+
errs = make([]error, nWorkers)
40+
)
41+
42+
for w := uint32(0); w < nWorkers; w++ {
43+
w := w
44+
firstPage := w*perWorker + 1
45+
lastPage := firstPage + perWorker - 1
46+
if lastPage > nPages {
47+
lastPage = nPages
48+
}
49+
50+
wg.Add(1)
51+
go func() {
52+
rets[w], errs[w] = checksumPagesSerial(dbPath, firstPage, lastPage, int64(pageSize), checksums)
53+
wg.Done()
54+
}()
55+
}
56+
57+
wg.Wait()
58+
for i, err := range errs {
59+
if err != nil {
60+
return checksums[:rets[i]], err
61+
}
62+
}
63+
64+
return checksums, nil
65+
}
66+
67+
func checksumPagesSerial(dbPath string, firstPage, lastPage uint32, pageSize int64, checksums []Checksum) (uint32, error) {
68+
f, err := os.Open(dbPath)
69+
if err != nil {
70+
return firstPage - 1, err
71+
}
72+
73+
_, err = f.Seek(int64(firstPage-1)*pageSize, io.SeekStart)
74+
if err != nil {
75+
return firstPage - 1, err
76+
}
77+
78+
buf := make([]byte, pageSize+4)
79+
h := NewHasher()
80+
81+
for pageNo := firstPage; pageNo <= lastPage; pageNo++ {
82+
binary.BigEndian.PutUint32(buf, pageNo)
83+
84+
if _, err := io.ReadFull(f, buf[4:]); err != nil {
85+
return pageNo - 1, err
86+
}
87+
88+
h.Reset()
89+
_, _ = h.Write(buf)
90+
checksums[pageNo-1] = ChecksumFlag | Checksum(h.Sum64())
91+
}
92+
93+
return lastPage, nil
94+
}
95+
96+
// ChecksumPage returns a CRC64 checksum that combines the page number & page data.
97+
func ChecksumPage(pgno uint32, data []byte) Checksum {
98+
return ChecksumPageWithHasher(NewHasher(), pgno, data)
99+
}
100+
101+
// ChecksumPageWithHasher returns a CRC64 checksum that combines the page number & page data.
102+
func ChecksumPageWithHasher(h hash.Hash64, pgno uint32, data []byte) Checksum {
103+
h.Reset()
104+
_ = binary.Write(h, binary.BigEndian, pgno)
105+
_, _ = h.Write(data)
106+
return ChecksumFlag | Checksum(h.Sum64())
107+
}
108+
109+
// ChecksumReader reads an entire database file from r and computes its rolling checksum.
110+
func ChecksumReader(r io.Reader, pageSize int) (Checksum, error) {
111+
data := make([]byte, pageSize)
112+
113+
var chksum Checksum
114+
for pgno := uint32(1); ; pgno++ {
115+
if _, err := io.ReadFull(r, data); err == io.EOF {
116+
break
117+
} else if err != nil {
118+
return chksum, err
119+
}
120+
chksum = ChecksumFlag | (chksum ^ ChecksumPage(pgno, data))
121+
}
122+
return chksum, nil
123+
}
124+
125+
// ParseChecksum parses a 16-character hex string into a checksum.
126+
func ParseChecksum(s string) (Checksum, error) {
127+
if len(s) != 16 {
128+
return 0, fmt.Errorf("invalid formatted checksum length: %q", s)
129+
}
130+
v, err := strconv.ParseUint(s, 16, 64)
131+
if err != nil {
132+
return 0, fmt.Errorf("invalid checksum format: %q", s)
133+
}
134+
return Checksum(v), nil
135+
}
136+
137+
// String returns c formatted as a fixed-width hex number.
138+
func (c Checksum) String() string {
139+
return fmt.Sprintf("%016x", uint64(c))
140+
}
141+
142+
func (c Checksum) MarshalJSON() ([]byte, error) {
143+
return []byte(`"` + c.String() + `"`), nil
144+
}
145+
146+
func (c *Checksum) UnmarshalJSON(data []byte) (err error) {
147+
var s *string
148+
if err := json.Unmarshal(data, &s); err != nil {
149+
return fmt.Errorf("cannot unmarshal checksum from JSON value")
150+
}
151+
152+
// Set to zero if value is nil.
153+
if s == nil {
154+
*c = 0
155+
return nil
156+
}
157+
158+
chksum, err := ParseChecksum(*s)
159+
if err != nil {
160+
return fmt.Errorf("cannot parse checksum from JSON string: %q", *s)
161+
}
162+
*c = Checksum(chksum)
163+
164+
return nil
165+
}
166+
167+
// NewHasher returns a new CRC64-ISO hasher.
168+
func NewHasher() hash.Hash64 {
169+
return crc64.New(crc64.MakeTable(crc64.ISO))
170+
}

checksum_test.go

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package ltx
2+
3+
import (
4+
"crypto/rand"
5+
"fmt"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
"testing"
10+
)
11+
12+
func TestChecksumPages(t *testing.T) {
13+
// files divisible into pages
14+
testChecksumPages(t, 1024*4, 4, 1024, 1)
15+
testChecksumPages(t, 1024*4, 4, 1024, 2)
16+
testChecksumPages(t, 1024*4, 4, 1024, 3)
17+
testChecksumPages(t, 1024*4, 4, 1024, 4)
18+
19+
// short pages
20+
testChecksumPages(t, 1024*3+100, 4, 1024, 1)
21+
testChecksumPages(t, 1024*3+100, 4, 1024, 2)
22+
testChecksumPages(t, 1024*3+100, 4, 1024, 3)
23+
testChecksumPages(t, 1024*3+100, 4, 1024, 4)
24+
25+
// empty files
26+
testChecksumPages(t, 0, 4, 1024, 1)
27+
testChecksumPages(t, 0, 4, 1024, 2)
28+
testChecksumPages(t, 0, 4, 1024, 3)
29+
testChecksumPages(t, 0, 4, 1024, 4)
30+
}
31+
32+
func testChecksumPages(t *testing.T, fileSize, nPages, pageSize, nWorkers uint32) {
33+
t.Run(fmt.Sprintf("fileSize=%d,nPages=%d,pageSize=%d,nWorkers=%d", fileSize, nPages, pageSize, nWorkers), func(t *testing.T) {
34+
path := filepath.Join(t.TempDir(), "test.db")
35+
f, err := os.Create(path)
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
if _, err := io.CopyN(f, rand.Reader, int64(fileSize)); err != nil {
40+
t.Fatal(err)
41+
}
42+
43+
legacyCS, legacyErr := legacyChecksumPages(path, pageSize, nPages)
44+
newCS, newErr := ChecksumPages(path, pageSize, nPages, nWorkers)
45+
46+
if legacyErr != newErr {
47+
t.Fatalf("legacy error: %v, new error: %v", legacyErr, newErr)
48+
}
49+
if len(legacyCS) != len(newCS) {
50+
t.Fatalf("legacy checksums: %d, new checksums: %d", len(legacyCS), len(newCS))
51+
}
52+
for i := range legacyCS {
53+
if legacyCS[i] != newCS[i] {
54+
t.Fatalf("mismatch at index %d: legacy: %v, new: %v", i, legacyCS[i], newCS[i])
55+
}
56+
}
57+
})
58+
}
59+
60+
// logic copied from litefs repo
61+
func legacyChecksumPages(dbPath string, pageSize, nPages uint32) ([]Checksum, error) {
62+
f, err := os.Open(dbPath)
63+
if err != nil {
64+
return nil, err
65+
}
66+
defer f.Close()
67+
68+
checksums := make([]Checksum, 0, nPages)
69+
buf := make([]byte, pageSize)
70+
71+
for pgno := uint32(1); pgno <= nPages; pgno++ {
72+
offset := int64(pgno-1) * int64(pageSize)
73+
if _, err := readFullAt(f, buf, offset); err != nil {
74+
return checksums, err
75+
}
76+
77+
checksums = append(checksums, ChecksumPage(pgno, buf))
78+
}
79+
80+
return checksums, nil
81+
}
82+
83+
// copied from litefs/internal
84+
func readFullAt(r io.ReaderAt, buf []byte, off int64) (n int, err error) {
85+
for n < len(buf) && err == nil {
86+
var nn int
87+
nn, err = r.ReadAt(buf[n:], off+int64(n))
88+
n += nn
89+
}
90+
if n >= len(buf) {
91+
return n, nil
92+
} else if n > 0 && err == io.EOF {
93+
return n, io.ErrUnexpectedEOF
94+
}
95+
return n, err
96+
}

ltx.go

-81
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"encoding/json"
88
"errors"
99
"fmt"
10-
"hash"
11-
"hash/crc64"
1210
"io"
1311
"regexp"
1412
"strconv"
@@ -168,51 +166,6 @@ func (t *TXID) UnmarshalJSON(data []byte) (err error) {
168166
return nil
169167
}
170168

171-
// Checksum represents an LTX checksum.
172-
type Checksum uint64
173-
174-
// ParseChecksum parses a 16-character hex string into a checksum.
175-
func ParseChecksum(s string) (Checksum, error) {
176-
if len(s) != 16 {
177-
return 0, fmt.Errorf("invalid formatted checksum length: %q", s)
178-
}
179-
v, err := strconv.ParseUint(s, 16, 64)
180-
if err != nil {
181-
return 0, fmt.Errorf("invalid checksum format: %q", s)
182-
}
183-
return Checksum(v), nil
184-
}
185-
186-
// String returns c formatted as a fixed-width hex number.
187-
func (c Checksum) String() string {
188-
return fmt.Sprintf("%016x", uint64(c))
189-
}
190-
191-
func (c Checksum) MarshalJSON() ([]byte, error) {
192-
return []byte(`"` + c.String() + `"`), nil
193-
}
194-
195-
func (c *Checksum) UnmarshalJSON(data []byte) (err error) {
196-
var s *string
197-
if err := json.Unmarshal(data, &s); err != nil {
198-
return fmt.Errorf("cannot unmarshal checksum from JSON value")
199-
}
200-
201-
// Set to zero if value is nil.
202-
if s == nil {
203-
*c = 0
204-
return nil
205-
}
206-
207-
chksum, err := ParseChecksum(*s)
208-
if err != nil {
209-
return fmt.Errorf("cannot parse checksum from JSON string: %q", *s)
210-
}
211-
*c = Checksum(chksum)
212-
213-
return nil
214-
}
215-
216169
// Header flags.
217170
const (
218171
HeaderFlagMask = uint32(0x00000001)
@@ -474,40 +427,6 @@ func (h *PageHeader) UnmarshalBinary(b []byte) error {
474427
return nil
475428
}
476429

477-
// NewHasher returns a new CRC64-ISO hasher.
478-
func NewHasher() hash.Hash64 {
479-
return crc64.New(crc64.MakeTable(crc64.ISO))
480-
}
481-
482-
// ChecksumPage returns a CRC64 checksum that combines the page number & page data.
483-
func ChecksumPage(pgno uint32, data []byte) Checksum {
484-
return ChecksumPageWithHasher(NewHasher(), pgno, data)
485-
}
486-
487-
// ChecksumPageWithHasher returns a CRC64 checksum that combines the page number & page data.
488-
func ChecksumPageWithHasher(h hash.Hash64, pgno uint32, data []byte) Checksum {
489-
h.Reset()
490-
_ = binary.Write(h, binary.BigEndian, pgno)
491-
_, _ = h.Write(data)
492-
return ChecksumFlag | Checksum(h.Sum64())
493-
}
494-
495-
// ChecksumReader reads an entire database file from r and computes its rolling checksum.
496-
func ChecksumReader(r io.Reader, pageSize int) (Checksum, error) {
497-
data := make([]byte, pageSize)
498-
499-
var chksum Checksum
500-
for pgno := uint32(1); ; pgno++ {
501-
if _, err := io.ReadFull(r, data); err == io.EOF {
502-
break
503-
} else if err != nil {
504-
return chksum, err
505-
}
506-
chksum = ChecksumFlag | (chksum ^ ChecksumPage(pgno, data))
507-
}
508-
return chksum, nil
509-
}
510-
511430
// ParseFilename parses a transaction range from an LTX file.
512431
func ParseFilename(name string) (minTXID, maxTXID TXID, err error) {
513432
a := filenameRegex.FindStringSubmatch(name)

0 commit comments

Comments
 (0)