Skip to content

Commit cd8fa0b

Browse files
committed
Add support for limited memory use
1 parent 5241acb commit cd8fa0b

9 files changed

Lines changed: 325 additions & 39 deletions

File tree

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
language: go
22

33
go:
4-
- 1.10.3
54
- 1.11
5+
- 1.12
66

77
os:
88
- osx

arena.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ func newArena(file string, size int) (*arena, error) {
3131
return nil, err
3232
}
3333

34+
// We can close the file descriptor here
35+
if err := fd.Close(); err != nil {
36+
return nil, err
37+
}
38+
3439
return &arena{
3540
IMmap: m,
3641
size: size,

arena_test.go

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,47 @@ import (
99
)
1010

1111
func TestNewArenaNoDir(t *testing.T) {
12-
arena, err := newArena(fmt.Sprintf("%d/temp.dat", time.Now().UnixNano()), 100)
13-
if arena != nil || err == nil || os.IsExist(err) {
12+
aa, err := newArena(fmt.Sprintf("%d/temp.dat", time.Now().UnixNano()), 100)
13+
if aa != nil || err == nil || os.IsExist(err) {
1414
t.Fatalf("unexpected return for newArena :: %v", err)
1515
}
1616
}
1717

18+
func TestNewArenaNoReadPerm(t *testing.T) {
19+
fileName := path.Join(os.TempDir(), "temp.dat")
20+
defer os.Remove(fileName)
21+
if _, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 000); err != nil {
22+
t.Fatalf("unable to create file :: %v", err)
23+
}
24+
25+
arena, err := newArena(fileName, 100)
26+
if arena != nil || err == nil || !os.IsPermission(err) {
27+
aa, err := newArena("/temp.dat", 100)
28+
if aa != nil || err == nil || !os.IsPermission(err) {
29+
t.Fatalf("unexpected return for newArena :: %v", err)
30+
}
31+
}
32+
}
33+
1834
func TestNewArenaNoFile(t *testing.T) {
1935
arenaSize := 100
2036
fileName := path.Join(os.TempDir(), "temp.dat")
2137
defer os.Remove(fileName)
2238

23-
arena, err := newArena(fileName, arenaSize)
39+
aa, err := newArena(fileName, arenaSize)
2440
if err != nil {
2541
t.Fatalf("error in creating new arena: %v", err)
2642
}
27-
defer arena.Unmap()
43+
defer func() {
44+
err := aa.Unmap()
45+
if err != nil {
46+
t.Fatalf("error occurred while unmapping: %v", err)
47+
}
48+
}()
2849

2950
// ensure arena struct stores correct size
30-
if arena.size != arenaSize {
31-
t.Fatalf("arena size do not match, exp: %v, actual: %v", arenaSize, arena.size)
51+
if aa.size != arenaSize {
52+
t.Fatalf("arena size do not match, exp: %v, actual: %v", arenaSize, aa.size)
3253
}
3354

3455
// ensure underlined file is of correct size
@@ -55,15 +76,20 @@ func TestNewArenaLargerFile(t *testing.T) {
5576
}
5677

5778
// creating new arena
58-
arena, err := newArena(fileName, arenaSize)
79+
aa, err := newArena(fileName, arenaSize)
5980
if err != nil {
6081
t.Fatalf("error in creating new arena: %v", err)
6182
}
62-
defer arena.Unmap()
83+
defer func() {
84+
err := aa.Unmap()
85+
if err != nil {
86+
t.Fatalf("error occurred while unmapping: %v", err)
87+
}
88+
}()
6389

6490
// ensure arena struct stores correct size
65-
if arena.size != arenaSize {
66-
t.Fatalf("arena size do not match, exp: %v, actual: %v", arenaSize, arena.size)
91+
if aa.size != arenaSize {
92+
t.Fatalf("arena size do not match, exp: %v, actual: %v", arenaSize, aa.size)
6793
}
6894

6995
// ensure underlined file is still of original size
@@ -78,8 +104,8 @@ func TestNewArenaLargerFile(t *testing.T) {
78104

79105
func TestNewArenaNoFolder(t *testing.T) {
80106
arenaSize := 100
81-
arena, err := newArena("1/2/3/4/5/6/arena.dat", arenaSize)
82-
if !os.IsNotExist(err) || arena != nil {
107+
aa, err := newArena("1/2/3/4/5/6/aa.dat", arenaSize)
108+
if !os.IsNotExist(err) || aa != nil {
83109
t.Fatalf("expected file not exists error, returned: %v", err)
84110
}
85111
}

arenamanager.go

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bigqueue
33
import (
44
"errors"
55
"fmt"
6+
"os"
67
"path"
78
)
89

@@ -11,27 +12,33 @@ const (
1112
)
1213

1314
var (
14-
// errShouldNotReach is returned when code reaches unexpected places
15+
// errShouldNotReach is returned when an invariant is not true anymore
1516
errShouldNotReach = errors.New("SHOULD NOT REACH HERE")
1617
)
1718

1819
// arenaManager manages all the arenas for a bigqueue
1920
type arenaManager struct {
20-
dir string
21-
conf *bqConfig
22-
baseAid int
23-
arenaList []*arena
21+
dir string
22+
conf *bqConfig
23+
index *queueIndex
24+
baseAid int
25+
arenaList []*arena
26+
inMemArenas int
2427
}
2528

2629
// newArenaManager returns a pointer to new arenaManager
27-
func newArenaManager(dir string, conf *bqConfig, headAid, tailAid int) (
30+
func newArenaManager(dir string, conf *bqConfig, index *queueIndex) (
2831
*arenaManager, error) {
2932

33+
headAid, _ := index.getHead()
34+
tailAid, _ := index.getTail()
35+
3036
numArenas := tailAid + 1 - headAid
3137
arenaList := make([]*arena, numArenas)
3238
am := &arenaManager{
3339
dir: dir,
3440
conf: conf,
41+
index: index,
3542
baseAid: headAid,
3643
arenaList: arenaList,
3744
}
@@ -40,6 +47,7 @@ func newArenaManager(dir string, conf *bqConfig, headAid, tailAid int) (
4047
if err := am.loadArenaIntoMemory(headAid); err != nil {
4148
return nil, err
4249
}
50+
4351
if err := am.loadArenaIntoMemory(tailAid); err != nil {
4452
return nil, err
4553
}
@@ -63,6 +71,12 @@ func (m *arenaManager) getArena(aid int) (*arena, error) {
6371
return aa, nil
6472
}
6573

74+
// before we get a new arena into memory, we need to ensure that after fetching
75+
// a new arena into memory, we do not cross the provided memory limit
76+
if err := m.ensureEnoughMem(); err != nil {
77+
return nil, err
78+
}
79+
6680
// otherwise, get arena into memory
6781
if err := m.loadArenaIntoMemory(aid); err != nil {
6882
return nil, err
@@ -71,18 +85,103 @@ func (m *arenaManager) getArena(aid int) (*arena, error) {
7185
return m.arenaList[relAid], nil
7286
}
7387

88+
// ensureEnoughMem ensures that at least 1 new arena can be brought into memory
89+
// TODO: shrink arenaList
90+
func (m *arenaManager) ensureEnoughMem() error {
91+
// Check whether head has moved and arenas can be unmpped
92+
// Remove all such arenas from memory, irrespectively
93+
headAid, _ := m.index.getHead()
94+
for aid := m.baseAid; aid < headAid; aid++ {
95+
if err := m.unloadArenaFromMemory(aid); err != nil {
96+
return err
97+
}
98+
99+
// disk garbage collection
100+
if err := m.deleteArenaBackedFile(aid); err != nil {
101+
return err
102+
}
103+
}
104+
105+
// if no limit on # of arenas, no need for eviction
106+
if m.conf.maxInMemArenas == 0 {
107+
return nil
108+
}
109+
110+
// Check whether an eviction is needed to begin with
111+
if m.inMemArenas < m.conf.maxInMemArenas {
112+
return nil
113+
}
114+
115+
// Start evicting from the arena just before the last arena that we have.
116+
// If message size > arena size, last arena may not always be the tail arena.
117+
// We always ensure that head and tail arenas are not evicted from memory.
118+
// Assuming m.conf.maxInMemArenas >= 3.
119+
// Simply iterate from the last arena until enough memory is
120+
// available for a new arena to be loaded into memory
121+
tailAid, _ := m.index.getTail()
122+
curAid := m.baseAid + len(m.arenaList)
123+
for m.conf.maxInMemArenas-m.inMemArenas <= 0 {
124+
curAid--
125+
126+
if curAid < 0 {
127+
return errShouldNotReach
128+
}
129+
130+
if curAid == tailAid || curAid == headAid {
131+
continue
132+
}
133+
134+
if err := m.unloadArenaFromMemory(curAid); err != nil {
135+
return err
136+
}
137+
}
138+
139+
return nil
140+
}
141+
74142
// loadArenaIntoMemory will fetch the arena into memory
75143
func (m *arenaManager) loadArenaIntoMemory(aid int) error {
144+
if m.arenaList[aid-m.baseAid] != nil {
145+
return nil
146+
}
147+
76148
filePath := path.Join(m.dir, fmt.Sprintf(cArenaFileFmt, aid))
77149
aa, err := newArena(filePath, m.conf.arenaSize)
78150
if err != nil {
79151
return err
80152
}
81153

154+
m.inMemArenas++
82155
m.arenaList[aid-m.baseAid] = aa
83156
return nil
84157
}
85158

159+
// unloadArenaFromMemory will remove the arena from memory
160+
func (m *arenaManager) unloadArenaFromMemory(aid int) error {
161+
if m.arenaList[aid-m.baseAid] == nil {
162+
return nil
163+
}
164+
165+
if err := m.arenaList[aid-m.baseAid].Unmap(); err != nil {
166+
return err
167+
}
168+
169+
m.inMemArenas--
170+
m.arenaList[aid-m.baseAid] = nil
171+
return nil
172+
}
173+
174+
// deleteArenaBackedFile deletes the backed file for given arena with
175+
// arena id: aid. If file doesn't exist, the error is ignored.
176+
func (m *arenaManager) deleteArenaBackedFile(aid int) error {
177+
filePath := path.Join(m.dir, fmt.Sprintf(cArenaFileFmt, aid))
178+
if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
179+
return err
180+
}
181+
182+
return nil
183+
}
184+
86185
// close unmaps all the arenas managed by arenaManager
87186
func (m *arenaManager) close() error {
88187
var retErr error

bigqueue.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type BigQueue struct {
2727
}
2828

2929
// NewBigQueue constructs an instance of *BigQueue
30-
func NewBigQueue(dir string, opts ...Option) (IBigQueue, error) {
30+
func NewBigQueue(dir string, opts ...Option) (*BigQueue, error) {
3131
complete := false
3232

3333
// setup configuration
@@ -50,9 +50,7 @@ func NewBigQueue(dir string, opts ...Option) (IBigQueue, error) {
5050
}()
5151

5252
// create arena manager
53-
headAid, _ := index.getHead()
54-
tailAid, _ := index.getTail()
55-
am, err := newArenaManager(dir, conf, headAid, tailAid)
53+
am, err := newArenaManager(dir, conf, index)
5654
if err != nil {
5755
return nil, err
5856
}

0 commit comments

Comments
 (0)