Skip to content

Commit 67dadff

Browse files
committed
optimize checksumming entire database
1 parent cd75b15 commit 67dadff

File tree

3 files changed

+280
-81
lines changed

3 files changed

+280
-81
lines changed

checksum.go

+179
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package ltx
2+
3+
import (
4+
"encoding/binary"
5+
"encoding/json"
6+
"fmt"
7+
"hash"
8+
"hash/crc64"
9+
"io"
10+
"os"
11+
"strconv"
12+
"sync"
13+
)
14+
15+
// Checksum represents an LTX checksum.
16+
type Checksum uint64
17+
18+
// ChecksumPages updates the provided checksums slice with the checksum of each
19+
// page in the specified file. The first (by page number) error encountered is
20+
// returned along with the number of the last page successfully checksummed.
21+
// Checksums for subsequent pages may be updated, regardless of an error being
22+
// returned.
23+
//
24+
// nWorkers specifies the amount of parallelism to use. A reasonable default
25+
// will be used if nWorkers is 0.
26+
func ChecksumPages(dbPath string, pageSize, nPages, nWorkers uint32, checksums []Checksum) (uint32, error) {
27+
// Based on experimentation on a fly.io machine with a slow SSD, 512Mb is
28+
// where checksumming starts to take >1s. As the database size increases, we
29+
// get more benefit from an increasing number of workers. Doing a bunch of
30+
// benchmarking on fly.io machines of difference sizes with a 1Gb database,
31+
// 24 threads seems to be the sweet spot.
32+
if nWorkers == 0 && pageSize*nPages > 512*1024*1024 {
33+
nWorkers = 24
34+
}
35+
36+
if nWorkers <= 1 {
37+
return checksumPagesSerial(dbPath, 1, nPages, int64(pageSize), checksums)
38+
}
39+
40+
perWorker := nPages / nWorkers
41+
if nPages%nWorkers != 0 {
42+
perWorker++
43+
}
44+
45+
var (
46+
wg sync.WaitGroup
47+
rets = make([]uint32, nWorkers)
48+
errs = make([]error, nWorkers)
49+
)
50+
51+
for w := uint32(0); w < nWorkers; w++ {
52+
w := w
53+
firstPage := w*perWorker + 1
54+
lastPage := firstPage + perWorker - 1
55+
if lastPage > nPages {
56+
lastPage = nPages
57+
}
58+
59+
wg.Add(1)
60+
go func() {
61+
rets[w], errs[w] = checksumPagesSerial(dbPath, firstPage, lastPage, int64(pageSize), checksums)
62+
wg.Done()
63+
}()
64+
}
65+
66+
wg.Wait()
67+
for i, err := range errs {
68+
if err != nil {
69+
return rets[i], err
70+
}
71+
}
72+
73+
return nPages, nil
74+
}
75+
76+
func checksumPagesSerial(dbPath string, firstPage, lastPage uint32, pageSize int64, checksums []Checksum) (uint32, error) {
77+
f, err := os.Open(dbPath)
78+
if err != nil {
79+
return firstPage - 1, err
80+
}
81+
82+
_, err = f.Seek(int64(firstPage-1)*pageSize, io.SeekStart)
83+
if err != nil {
84+
return firstPage - 1, err
85+
}
86+
87+
buf := make([]byte, pageSize+4)
88+
h := NewHasher()
89+
90+
for pageNo := firstPage; pageNo <= lastPage; pageNo++ {
91+
binary.BigEndian.PutUint32(buf, pageNo)
92+
93+
if _, err := io.ReadFull(f, buf[4:]); err != nil {
94+
return pageNo - 1, err
95+
}
96+
97+
h.Reset()
98+
_, _ = h.Write(buf)
99+
checksums[pageNo-1] = ChecksumFlag | Checksum(h.Sum64())
100+
}
101+
102+
return lastPage, nil
103+
}
104+
105+
// ChecksumPage returns a CRC64 checksum that combines the page number & page data.
106+
func ChecksumPage(pgno uint32, data []byte) Checksum {
107+
return ChecksumPageWithHasher(NewHasher(), pgno, data)
108+
}
109+
110+
// ChecksumPageWithHasher returns a CRC64 checksum that combines the page number & page data.
111+
func ChecksumPageWithHasher(h hash.Hash64, pgno uint32, data []byte) Checksum {
112+
h.Reset()
113+
_ = binary.Write(h, binary.BigEndian, pgno)
114+
_, _ = h.Write(data)
115+
return ChecksumFlag | Checksum(h.Sum64())
116+
}
117+
118+
// ChecksumReader reads an entire database file from r and computes its rolling checksum.
119+
func ChecksumReader(r io.Reader, pageSize int) (Checksum, error) {
120+
data := make([]byte, pageSize)
121+
122+
var chksum Checksum
123+
for pgno := uint32(1); ; pgno++ {
124+
if _, err := io.ReadFull(r, data); err == io.EOF {
125+
break
126+
} else if err != nil {
127+
return chksum, err
128+
}
129+
chksum = ChecksumFlag | (chksum ^ ChecksumPage(pgno, data))
130+
}
131+
return chksum, nil
132+
}
133+
134+
// ParseChecksum parses a 16-character hex string into a checksum.
135+
func ParseChecksum(s string) (Checksum, error) {
136+
if len(s) != 16 {
137+
return 0, fmt.Errorf("invalid formatted checksum length: %q", s)
138+
}
139+
v, err := strconv.ParseUint(s, 16, 64)
140+
if err != nil {
141+
return 0, fmt.Errorf("invalid checksum format: %q", s)
142+
}
143+
return Checksum(v), nil
144+
}
145+
146+
// String returns c formatted as a fixed-width hex number.
147+
func (c Checksum) String() string {
148+
return fmt.Sprintf("%016x", uint64(c))
149+
}
150+
151+
func (c Checksum) MarshalJSON() ([]byte, error) {
152+
return []byte(`"` + c.String() + `"`), nil
153+
}
154+
155+
func (c *Checksum) UnmarshalJSON(data []byte) (err error) {
156+
var s *string
157+
if err := json.Unmarshal(data, &s); err != nil {
158+
return fmt.Errorf("cannot unmarshal checksum from JSON value")
159+
}
160+
161+
// Set to zero if value is nil.
162+
if s == nil {
163+
*c = 0
164+
return nil
165+
}
166+
167+
chksum, err := ParseChecksum(*s)
168+
if err != nil {
169+
return fmt.Errorf("cannot parse checksum from JSON string: %q", *s)
170+
}
171+
*c = Checksum(chksum)
172+
173+
return nil
174+
}
175+
176+
// NewHasher returns a new CRC64-ISO hasher.
177+
func NewHasher() hash.Hash64 {
178+
return crc64.New(crc64.MakeTable(crc64.ISO))
179+
}

checksum_test.go

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package ltx
2+
3+
import (
4+
"crypto/rand"
5+
"fmt"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
"testing"
10+
)
11+
12+
func TestChecksumPages(t *testing.T) {
13+
// files divisible into pages
14+
testChecksumPages(t, 1024*4, 4, 1024, 1)
15+
testChecksumPages(t, 1024*4, 4, 1024, 2)
16+
testChecksumPages(t, 1024*4, 4, 1024, 3)
17+
testChecksumPages(t, 1024*4, 4, 1024, 4)
18+
19+
// short pages
20+
testChecksumPages(t, 1024*3+100, 4, 1024, 1)
21+
testChecksumPages(t, 1024*3+100, 4, 1024, 2)
22+
testChecksumPages(t, 1024*3+100, 4, 1024, 3)
23+
testChecksumPages(t, 1024*3+100, 4, 1024, 4)
24+
25+
// empty files
26+
testChecksumPages(t, 0, 4, 1024, 1)
27+
testChecksumPages(t, 0, 4, 1024, 2)
28+
testChecksumPages(t, 0, 4, 1024, 3)
29+
testChecksumPages(t, 0, 4, 1024, 4)
30+
}
31+
32+
func testChecksumPages(t *testing.T, fileSize, nPages, pageSize, nWorkers uint32) {
33+
t.Run(fmt.Sprintf("fileSize=%d,nPages=%d,pageSize=%d,nWorkers=%d", fileSize, nPages, pageSize, nWorkers), func(t *testing.T) {
34+
path := filepath.Join(t.TempDir(), "test.db")
35+
f, err := os.Create(path)
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
defer f.Close()
40+
if _, err := io.CopyN(f, rand.Reader, int64(fileSize)); err != nil {
41+
t.Fatal(err)
42+
}
43+
44+
legacyCS := make([]Checksum, nPages)
45+
legacyLastPage, legacyErr := legacyChecksumPages(path, pageSize, nPages, legacyCS)
46+
newCS := make([]Checksum, nPages)
47+
newLastPage, newErr := ChecksumPages(path, pageSize, nPages, nWorkers, newCS)
48+
49+
if legacyErr != newErr {
50+
t.Fatalf("legacy error: %v, new error: %v", legacyErr, newErr)
51+
}
52+
if legacyLastPage != newLastPage {
53+
t.Fatalf("legacy last page: %d, new last page: %d", legacyLastPage, newLastPage)
54+
}
55+
if len(legacyCS) != len(newCS) {
56+
t.Fatalf("legacy checksums: %d, new checksums: %d", len(legacyCS), len(newCS))
57+
}
58+
for i := range legacyCS {
59+
if legacyCS[i] != newCS[i] {
60+
t.Fatalf("mismatch at index %d: legacy: %v, new: %v", i, legacyCS[i], newCS[i])
61+
}
62+
}
63+
})
64+
}
65+
66+
// logic copied from litefs repo
67+
func legacyChecksumPages(dbPath string, pageSize, nPages uint32, checksums []Checksum) (uint32, error) {
68+
f, err := os.Open(dbPath)
69+
if err != nil {
70+
return 0, err
71+
}
72+
defer f.Close()
73+
74+
buf := make([]byte, pageSize)
75+
76+
for pgno := uint32(1); pgno <= nPages; pgno++ {
77+
offset := int64(pgno-1) * int64(pageSize)
78+
if _, err := readFullAt(f, buf, offset); err != nil {
79+
return pgno - 1, err
80+
}
81+
82+
checksums[pgno-1] = ChecksumPage(pgno, buf)
83+
}
84+
85+
return nPages, nil
86+
}
87+
88+
// copied from litefs/internal
89+
func readFullAt(r io.ReaderAt, buf []byte, off int64) (n int, err error) {
90+
for n < len(buf) && err == nil {
91+
var nn int
92+
nn, err = r.ReadAt(buf[n:], off+int64(n))
93+
n += nn
94+
}
95+
if n >= len(buf) {
96+
return n, nil
97+
} else if n > 0 && err == io.EOF {
98+
return n, io.ErrUnexpectedEOF
99+
}
100+
return n, err
101+
}

ltx.go

-81
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"encoding/json"
88
"errors"
99
"fmt"
10-
"hash"
11-
"hash/crc64"
1210
"io"
1311
"regexp"
1412
"strconv"
@@ -168,51 +166,6 @@ func (t *TXID) UnmarshalJSON(data []byte) (err error) {
168166
return nil
169167
}
170168

171-
// Checksum represents an LTX checksum.
172-
type Checksum uint64
173-
174-
// ParseChecksum parses a 16-character hex string into a checksum.
175-
func ParseChecksum(s string) (Checksum, error) {
176-
if len(s) != 16 {
177-
return 0, fmt.Errorf("invalid formatted checksum length: %q", s)
178-
}
179-
v, err := strconv.ParseUint(s, 16, 64)
180-
if err != nil {
181-
return 0, fmt.Errorf("invalid checksum format: %q", s)
182-
}
183-
return Checksum(v), nil
184-
}
185-
186-
// String returns c formatted as a fixed-width hex number.
187-
func (c Checksum) String() string {
188-
return fmt.Sprintf("%016x", uint64(c))
189-
}
190-
191-
func (c Checksum) MarshalJSON() ([]byte, error) {
192-
return []byte(`"` + c.String() + `"`), nil
193-
}
194-
195-
func (c *Checksum) UnmarshalJSON(data []byte) (err error) {
196-
var s *string
197-
if err := json.Unmarshal(data, &s); err != nil {
198-
return fmt.Errorf("cannot unmarshal checksum from JSON value")
199-
}
200-
201-
// Set to zero if value is nil.
202-
if s == nil {
203-
*c = 0
204-
return nil
205-
}
206-
207-
chksum, err := ParseChecksum(*s)
208-
if err != nil {
209-
return fmt.Errorf("cannot parse checksum from JSON string: %q", *s)
210-
}
211-
*c = Checksum(chksum)
212-
213-
return nil
214-
}
215-
216169
// Header flags.
217170
const (
218171
HeaderFlagMask = uint32(0x00000001)
@@ -474,40 +427,6 @@ func (h *PageHeader) UnmarshalBinary(b []byte) error {
474427
return nil
475428
}
476429

477-
// NewHasher returns a new CRC64-ISO hasher.
478-
func NewHasher() hash.Hash64 {
479-
return crc64.New(crc64.MakeTable(crc64.ISO))
480-
}
481-
482-
// ChecksumPage returns a CRC64 checksum that combines the page number & page data.
483-
func ChecksumPage(pgno uint32, data []byte) Checksum {
484-
return ChecksumPageWithHasher(NewHasher(), pgno, data)
485-
}
486-
487-
// ChecksumPageWithHasher returns a CRC64 checksum that combines the page number & page data.
488-
func ChecksumPageWithHasher(h hash.Hash64, pgno uint32, data []byte) Checksum {
489-
h.Reset()
490-
_ = binary.Write(h, binary.BigEndian, pgno)
491-
_, _ = h.Write(data)
492-
return ChecksumFlag | Checksum(h.Sum64())
493-
}
494-
495-
// ChecksumReader reads an entire database file from r and computes its rolling checksum.
496-
func ChecksumReader(r io.Reader, pageSize int) (Checksum, error) {
497-
data := make([]byte, pageSize)
498-
499-
var chksum Checksum
500-
for pgno := uint32(1); ; pgno++ {
501-
if _, err := io.ReadFull(r, data); err == io.EOF {
502-
break
503-
} else if err != nil {
504-
return chksum, err
505-
}
506-
chksum = ChecksumFlag | (chksum ^ ChecksumPage(pgno, data))
507-
}
508-
return chksum, nil
509-
}
510-
511430
// ParseFilename parses a transaction range from an LTX file.
512431
func ParseFilename(name string) (minTXID, maxTXID TXID, err error) {
513432
a := filenameRegex.FindStringSubmatch(name)

0 commit comments

Comments
 (0)