Skip to content

Commit fddacd1

Browse files
committed
Customizable system resources configuration
This change would allow end users to specify custom system resources configuration. It should not change the existing behaviour as it keeps the defaults in place. Signed-off-by: Maxime VISONNEAU <[email protected]>
1 parent 43f7025 commit fddacd1

File tree

11 files changed

+115
-39
lines changed

11 files changed

+115
-39
lines changed

consumer_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (r *configRoulette) genConfigs(bestCfg *consumerConfig, queueEmpty bool) {
245245
r.oldBestCfg = bestCfg.Clone()
246246
r.addConfig(r.oldBestCfg)
247247

248-
if !hasFreeSystemResources() {
248+
if !hasFreeSystemResources(r.opt.MinSystemResources) {
249249
internal.Logger.Println("taskq: system does not have enough free resources")
250250
return
251251
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
github.com/onsi/gomega v1.10.3
2020
github.com/satori/go.uuid v1.2.0
2121
github.com/vmihailenco/msgpack/v5 v5.0.0
22+
github.com/stretchr/testify v1.6.1
2223
golang.org/x/net v0.0.0-20201027133719-8eef5233e2a1 // indirect
2324
google.golang.org/protobuf v1.25.0 // indirect
2425
)

queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ type QueueOptions struct {
5959
// Optional message handler. The default is the global Tasks registry.
6060
Handler Handler
6161

62+
// Minimal system resources required to consider the consumer available to process the queue
63+
MinSystemResources SystemResources
64+
6265
inited bool
6366
}
6467

@@ -118,6 +121,10 @@ func (opt *QueueOptions) Init() {
118121
if opt.Handler == nil {
119122
opt.Handler = &Tasks
120123
}
124+
125+
if opt.MinSystemResources == (SystemResources{}) {
126+
opt.MinSystemResources = NewDefaultSystemResources()
127+
}
121128
}
122129

123130
//------------------------------------------------------------------------------

sysinfo_linux.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

sysinfo_other.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

system.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package taskq
2+
3+
const (
4+
defaultSystemResourcesLoad1PerCPU float64 = 1.5
5+
defaultSystemResourcesMemoryFreeMB uint64 = 2e5
6+
defaultSystemResourcesMemoryFreePercentage uint64 = 5
7+
)
8+
9+
// SystemResources represents system related values
10+
type SystemResources struct {
11+
// Maximum per CPU load at 1min intervals
12+
Load1PerCPU float64
13+
14+
// Minimum free memory required in megabytes
15+
MemoryFreeMB uint64
16+
17+
// Minimum free memory required in percentage
18+
MemoryFreePercentage uint64
19+
}
20+
21+
// NewDefaultSystemResources returns a new SystemResources struct with some default values
22+
func NewDefaultSystemResources() SystemResources {
23+
return SystemResources{
24+
Load1PerCPU: defaultSystemResourcesLoad1PerCPU,
25+
MemoryFreeMB: defaultSystemResourcesMemoryFreeMB,
26+
MemoryFreePercentage: defaultSystemResourcesMemoryFreePercentage,
27+
}
28+
}

system_linux.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// +build linux
2+
3+
package taskq
4+
5+
import (
6+
"runtime"
7+
8+
"github.com/capnm/sysinfo"
9+
"github.com/vmihailenco/taskq/v3/internal"
10+
)
11+
12+
func hasFreeSystemResources(sr SystemResources) bool {
13+
si := sysinfo.Get()
14+
free := si.FreeRam + si.BufferRam
15+
16+
if sr.Load1PerCPU > 0 && si.Loads[0] > sr.Load1PerCPU*float64(runtime.NumCPU()) {
17+
internal.Logger.Println("taskq: consumer memory is lower than required")
18+
return false
19+
}
20+
21+
if sr.MemoryFreeMB > 0 && free < sr.MemoryFreeMB {
22+
internal.Logger.Println("taskq: consumer memory is lower than required")
23+
return false
24+
}
25+
26+
if sr.MemoryFreePercentage > 0 && free/si.TotalRam < sr.MemoryFreePercentage/100 {
27+
internal.Logger.Println("taskq: consumer memory is lower than required")
28+
return false
29+
}
30+
31+
return true
32+
}

system_linux_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package taskq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestHasFreeSystemResources(t *testing.T) {
10+
// TODO: Manage to mock capnm/sysinfo
11+
assert.True(t, hasFreeSystemResources(SystemResources{}))
12+
}

system_other.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// +build !linux
2+
3+
package taskq
4+
5+
func hasFreeSystemResources(_ SystemResources) bool {
6+
return true
7+
}

system_other_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package taskq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestHasFreeSystemResources(t *testing.T) {
10+
assert.True(t, hasFreeSystemResources(SystemResources{}))
11+
}

0 commit comments

Comments
 (0)