Skip to content

Commit a80574b

Browse files
faridtmammadovFarid Mammadov
authored andcommitted
split data in multiple file when the files hit a specific capacity
1 parent 3b4e8d9 commit a80574b

3 files changed

Lines changed: 146 additions & 37 deletions

File tree

disk_store.go

Lines changed: 100 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
"io/fs"
88
"log"
99
"os"
10+
"path/filepath"
11+
"regexp"
12+
"strconv"
1013
"time"
1114
)
1215

@@ -22,6 +25,10 @@ import (
2225
// https://pkg.go.dev/os#File.Seek
2326
const defaultWhence = 0
2427

28+
// MaxFileSize
29+
// while writing to file if max file size reached, new file will be created
30+
var MaxFileSize int64 = 1 << 30
31+
2532
// DiskStore is a Log-Structured Hash Table as described in the BitCask paper. We
2633
// keep appending the data to a file, like a log. DiskStorage maintains an in-memory
2734
// hash table called KeyDir, which keeps the row's location on the disk.
@@ -63,6 +70,8 @@ const defaultWhence = 0
6370
// store.Set("othello", "shakespeare")
6471
// author := store.Get("othello")
6572
type DiskStore struct {
73+
// directory name that contains all data files
74+
dir string
6675
// file object pointing the file_name
6776
file *os.File
6877
// current cursor position in the file where the data can be written
@@ -81,24 +90,26 @@ func isFileExists(fileName string) bool {
8190
return false
8291
}
8392

84-
func NewDiskStore(fileName string) (*DiskStore, error) {
85-
ds := &DiskStore{keyDir: make(map[string]KeyEntry)}
86-
// if the file exists already, then we will load the key_dir
87-
if isFileExists(fileName) {
88-
err := ds.initKeyDir(fileName)
93+
func NewDiskStore(directoryName string) (*DiskStore, error) {
94+
if !isFileExists(directoryName) {
95+
err := os.MkdirAll(directoryName, os.ModePerm)
96+
8997
if err != nil {
90-
log.Fatalf("error while loading the keys from disk: %v", err)
98+
return nil, err
9199
}
92100
}
93-
// we open the file in following modes:
94-
// os.O_APPEND - says that the writes are append only.
95-
// os.O_RDWR - says we can read and write to the file
96-
// os.O_CREATE - creates the file if it does not exist
97-
file, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0666)
101+
102+
ds := &DiskStore{
103+
dir: directoryName,
104+
keyDir: make(map[string]KeyEntry),
105+
}
106+
107+
err := ds.initKeyDir(directoryName)
108+
98109
if err != nil {
99-
return nil, err
110+
log.Fatalf("error while loading the keys from disk: %v", err)
100111
}
101-
ds.file = file
112+
102113
return ds, nil
103114
}
104115

@@ -119,13 +130,13 @@ func (d *DiskStore) Get(key string) (string, error) {
119130
}
120131

121132
// move the current pointer to the right offset
122-
_, err := d.file.Seek(int64(kEntry.position), defaultWhence)
133+
_, err := kEntry.file.Seek(int64(kEntry.position), defaultWhence)
123134
if err != nil {
124135
return "", ErrSeekFailed
125136
}
126137

127138
data := make([]byte, kEntry.totalSize)
128-
_, err = io.ReadFull(d.file, data)
139+
_, err = io.ReadFull(kEntry.file, data)
129140
if err != nil {
130141
return "", ErrReadFailed
131142
}
@@ -161,6 +172,10 @@ func (d *DiskStore) Set(key string, value string) error {
161172
r := Record{Header: h, Key: key, Value: value, RecordSize: headerSize + h.KeySize + h.ValueSize}
162173
r.Header.CheckSum = r.CalculateCheckSum()
163174

175+
if err := d.checkMaxFileSizeReached(r.RecordSize); err != nil {
176+
return err
177+
}
178+
164179
//encode the record
165180
buf := new(bytes.Buffer)
166181
err := r.EncodeKV(buf)
@@ -169,13 +184,29 @@ func (d *DiskStore) Set(key string, value string) error {
169184
}
170185
d.write(buf.Bytes())
171186

172-
d.keyDir[key] = NewKeyEntry(timestamp, uint32(d.writePosition), r.Size())
187+
d.keyDir[key] = NewKeyEntry(timestamp, d.file, uint32(d.writePosition), r.Size())
173188
// update last write position, so that next record can be written from this point
174189
d.writePosition += int(r.Size())
175190

176191
return nil
177192
}
178193

194+
func (d *DiskStore) checkMaxFileSizeReached(size uint32) error {
195+
stat, _ := d.file.Stat()
196+
nextSize := stat.Size() + int64(size)
197+
if nextSize > MaxFileSize {
198+
activeFile := createFilenameId(d.file.Name()) + ".bitcask.data"
199+
file, err := os.Create(filepath.Join(d.dir, activeFile))
200+
if err != nil {
201+
return err
202+
}
203+
d.file = file
204+
d.writePosition = 0
205+
}
206+
207+
return nil
208+
}
209+
179210
func (d *DiskStore) Delete(key string) error {
180211
timestamp := uint32(time.Now().Unix())
181212
value := ""
@@ -208,6 +239,9 @@ func (d *DiskStore) Close() bool {
208239
// TODO: log the error
209240
return false
210241
}
242+
for _, v := range d.keyDir {
243+
v.file.Close()
244+
}
211245
return true
212246
}
213247

@@ -226,15 +260,61 @@ func (d *DiskStore) write(data []byte) {
226260
}
227261
}
228262

229-
func (d *DiskStore) initKeyDir(existingFile string) error {
263+
func (d *DiskStore) initKeyDir(directoryName string) error {
264+
dirEntries, err := os.ReadDir(directoryName)
265+
if err != nil {
266+
return err
267+
}
268+
269+
for _, entry := range dirEntries {
270+
if entry.IsDir() {
271+
continue
272+
}
273+
274+
err = initKeyDirInternal(d.keyDir, filepath.Join(d.dir, entry.Name()))
275+
if err != nil {
276+
return err
277+
}
278+
}
279+
280+
fileName := createFilenameId("") + ".bitcask.data"
281+
282+
if len(dirEntries) > 0 {
283+
fileName = createFilenameId(dirEntries[len(dirEntries)-1].Name()) + ".bitcask.data"
284+
}
285+
286+
file, err := os.Create(filepath.Join(d.dir, fileName))
287+
if err != nil {
288+
return err
289+
}
290+
d.file = file
291+
d.writePosition = 0
292+
293+
return nil
294+
}
295+
296+
func createFilenameId(filename string) string {
297+
if filename == "" {
298+
return "1000000000"
299+
}
300+
pattern := regexp.MustCompile(`(\d+)\.bitcask`)
301+
matches := pattern.FindStringSubmatch(filename)
302+
303+
filenameId, _ := strconv.Atoi(matches[1])
304+
305+
return strconv.Itoa(filenameId + 1)
306+
}
307+
308+
func initKeyDirInternal(keyDir map[string]KeyEntry, existingFile string) error {
230309
// we will initialise the keyDir by reading the contents of the file, record by
231310
// record. As we read each record, we will also update our keyDir with the
232311
// corresponding KeyEntry
233312
//
234313
// NOTE: this method is a blocking one, if the DB size is yuge then it will take
235314
// a lot of time to startup
236315
file, _ := os.Open(existingFile)
237-
defer file.Close()
316+
writePosition := 0
317+
238318
for {
239319
header := make([]byte, headerSize)
240320
_, err := io.ReadFull(file, header)
@@ -265,8 +345,8 @@ func (d *DiskStore) initKeyDir(existingFile string) error {
265345
}
266346

267347
totalSize := headerSize + h.KeySize + h.ValueSize
268-
d.keyDir[string(key)] = NewKeyEntry(h.TimeStamp, uint32(d.writePosition), totalSize)
269-
d.writePosition += int(totalSize)
348+
keyDir[string(key)] = NewKeyEntry(h.TimeStamp, file, uint32(writePosition), totalSize)
349+
writePosition += int(totalSize)
270350
}
271351
return nil
272352
}

disk_store_test.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import (
1010
)
1111

1212
func TestDiskStore_Get(t *testing.T) {
13-
store, err := NewDiskStore("test.db")
13+
store, err := NewDiskStore("test_db")
1414
if err != nil {
1515
t.Fatalf("failed to create disk store: %v", err)
1616
}
17-
defer os.Remove("test.db")
17+
defer os.RemoveAll("test_db")
18+
defer store.Close()
1819
store.Set("name", "jojo")
1920
val, _ := store.Get("name")
2021
if val != "jojo" {
@@ -23,23 +24,25 @@ func TestDiskStore_Get(t *testing.T) {
2324
}
2425

2526
func TestDiskStore_GetInvalid(t *testing.T) {
26-
store, err := NewDiskStore("test.db")
27+
store, err := NewDiskStore("test_db")
2728
if err != nil {
2829
t.Fatalf("failed to create disk store: %v", err)
2930
}
30-
defer os.Remove("test.db")
31+
defer os.RemoveAll("test_db")
32+
defer store.Close()
3133
val, _ := store.Get("some key")
3234
if val != "" {
3335
t.Errorf("Get() = %v, want %v", val, "")
3436
}
3537
}
3638

3739
func TestDiskStore_SetWithPersistence(t *testing.T) {
38-
store, err := NewDiskStore("test.db")
40+
store, err := NewDiskStore("test_db")
3941
if err != nil {
4042
t.Fatalf("failed to create disk store: %v", err)
4143
}
42-
defer os.Remove("test.db")
44+
defer os.RemoveAll("test_db")
45+
defer store.Close()
4346

4447
tests := map[string]string{
4548
"crime and punishment": "dostoevsky",
@@ -59,7 +62,8 @@ func TestDiskStore_SetWithPersistence(t *testing.T) {
5962
}
6063
}
6164
store.Close()
62-
store, err = NewDiskStore("test.db")
65+
66+
store, err = NewDiskStore("test_db")
6367
if err != nil {
6468
t.Fatalf("failed to create disk store: %v", err)
6569
}
@@ -73,11 +77,12 @@ func TestDiskStore_SetWithPersistence(t *testing.T) {
7377
}
7478

7579
func TestDiskStore_Delete(t *testing.T) {
76-
store, err := NewDiskStore("test.db")
80+
store, err := NewDiskStore("test_db")
7781
if err != nil {
7882
t.Fatalf("failed to create disk store: %v", err)
7983
}
80-
defer os.Remove("test.db")
84+
defer os.RemoveAll("test_db")
85+
defer store.Close()
8186

8287
tests := map[string]string{
8388
"crime and punishment": "dostoevsky",
@@ -100,7 +105,7 @@ func TestDiskStore_Delete(t *testing.T) {
100105
}
101106
store.Close()
102107

103-
store, err = NewDiskStore("test.db")
108+
store, err = NewDiskStore("test_db")
104109
if err != nil {
105110
t.Fatalf("failed to create disk store: %v", err)
106111
}
@@ -121,9 +126,9 @@ func TestDiskStore_Delete(t *testing.T) {
121126
}
122127

123128
func TestDiskStore_InValidCheckSum(t *testing.T) {
124-
store, _ := NewDiskStore("test.db")
129+
store, _ := NewDiskStore("test_db")
130+
defer os.RemoveAll("test_db")
125131
defer store.Close()
126-
defer os.Remove("test.db")
127132

128133
k1, v1 := "👋", "world"
129134
h1 := Header{TimeStamp: uint32(time.Now().Unix()), KeySize: uint32(len(k1)), ValueSize: uint32(len(v1)), Meta: 0}
@@ -148,7 +153,7 @@ func TestDiskStore_InValidCheckSum(t *testing.T) {
148153
tt.EncodeKV(buf)
149154

150155
// store the data
151-
store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp, uint32(store.writePosition), tt.Size())
156+
store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp, store.file, uint32(store.writePosition), tt.Size())
152157
store.writePosition += int(tt.Size())
153158
store.write(buf.Bytes())
154159

@@ -172,7 +177,7 @@ func TestDiskStore_InValidCheckSum(t *testing.T) {
172177
}
173178

174179
// write the corrupted bytes and update the hash table
175-
store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp+uint32(time.Now().Unix()), uint32(store.writePosition), tt.Size())
180+
store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp+uint32(time.Now().Unix()), store.file, uint32(store.writePosition), tt.Size())
176181
store.writePosition += int(tt.Size())
177182
store.write(kvRecord)
178183

@@ -200,3 +205,24 @@ func TestDiskStore_InValidCheckSum(t *testing.T) {
200205
}
201206
}
202207
}
208+
209+
func TestDiskStore_NewFileCreatedAfterMaxFileSizeReached(t *testing.T) {
210+
store, err := NewDiskStore("test_db")
211+
if err != nil {
212+
t.Fatalf("failed to create disk store: %v", err)
213+
}
214+
defer os.RemoveAll("test_db")
215+
defer store.Close()
216+
217+
MaxFileSize = 50
218+
219+
store.Set("crime and punishment", "dostoevsky")
220+
store.Set("anna karenina", "tolstoy")
221+
222+
dirEntry, _ := os.ReadDir("test_db")
223+
224+
if len(dirEntry) == 1 {
225+
t.Errorf("directory must have more than 1 file")
226+
}
227+
228+
}

format.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"encoding/binary"
66
"hash/crc32"
7+
"os"
78
)
89

910
// format file provides encode/decode functions for serialisation and deserialisation
@@ -80,6 +81,8 @@ type KeyEntry struct {
8081
// Timestamp at which we wrote the KV pair to the disk. The value
8182
// is current time in seconds since the epoch.
8283
timestamp uint32
84+
// The file that the data exists
85+
file *os.File
8386
// The position is the byte offset in the file where the data
8487
// exists
8588
position uint32
@@ -103,8 +106,8 @@ type Record struct {
103106
RecordSize uint32
104107
}
105108

106-
func NewKeyEntry(timestamp uint32, position uint32, totalSize uint32) KeyEntry {
107-
return KeyEntry{timestamp, position, totalSize}
109+
func NewKeyEntry(timestamp uint32, file *os.File, position uint32, totalSize uint32) KeyEntry {
110+
return KeyEntry{timestamp, file, position, totalSize}
108111
}
109112

110113
func (h *Header) EncodeHeader(buf *bytes.Buffer) error {
@@ -180,4 +183,4 @@ func (r *Record) CalculateCheckSum() uint32 {
180183

181184
func (r *Record) VerifyCheckSum(data []byte) bool {
182185
return crc32.ChecksumIEEE(data[4:]) == r.Header.CheckSum
183-
}
186+
}

0 commit comments

Comments
 (0)