Skip to content

Commit e98ea9d

Browse files
committed
Customizable system resources configuration
This change would allow end users to specify custom system resources configuration. It should not change the existing behaviour as it keeps the defaults in place. Signed-off-by: Maxime VISONNEAU <[email protected]>
1 parent 1ce8735 commit e98ea9d

11 files changed

+115
-39
lines changed

consumer_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (r *configRoulette) genConfigs(bestCfg *consumerConfig, queueEmpty bool) {
245245
r.oldBestCfg = bestCfg.Clone()
246246
r.addConfig(r.oldBestCfg)
247247

248-
if !hasFreeSystemResources() {
248+
if !hasFreeSystemResources(r.opt.MinSystemResources) {
249249
internal.Logger.Println("taskq: system does not have enough free resources")
250250
return
251251
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/onsi/ginkgo v1.14.1
1717
github.com/onsi/gomega v1.10.2
1818
github.com/satori/go.uuid v1.2.0
19+
github.com/stretchr/testify v1.6.1
1920
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1
2021
github.com/vmihailenco/tagparser v0.1.2 // indirect
2122
go.opentelemetry.io/otel v0.13.0

queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ type QueueOptions struct {
5959
// Optional message handler. The default is the global Tasks registry.
6060
Handler Handler
6161

62+
// Minimal system resources required to consider the consumer available to process the queue
63+
MinSystemResources SystemResources
64+
6265
inited bool
6366
}
6467

@@ -118,6 +121,10 @@ func (opt *QueueOptions) Init() {
118121
if opt.Handler == nil {
119122
opt.Handler = &Tasks
120123
}
124+
125+
if opt.MinSystemResources == (SystemResources{}) {
126+
opt.MinSystemResources = NewDefaultSystemResources()
127+
}
121128
}
122129

123130
//------------------------------------------------------------------------------

sysinfo_linux.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

sysinfo_other.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

system.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package taskq
2+
3+
const (
4+
defaultSystemResourcesLoad1PerCPU float64 = 1.5
5+
defaultSystemResourcesMemoryFreeMB uint64 = 2e5
6+
defaultSystemResourcesMemoryFreePercentage uint64 = 5
7+
)
8+
9+
// SystemResources represents system related values
10+
type SystemResources struct {
11+
// Maximum per CPU load at 1min intervals
12+
Load1PerCPU float64
13+
14+
// Minimum free memory required in megabytes
15+
MemoryFreeMB uint64
16+
17+
// Minimum free memory required in percentage
18+
MemoryFreePercentage uint64
19+
}
20+
21+
// NewDefaultSystemResources returns a new SystemResources struct with some default values
22+
func NewDefaultSystemResources() SystemResources {
23+
return SystemResources{
24+
Load1PerCPU: defaultSystemResourcesLoad1PerCPU,
25+
MemoryFreeMB: defaultSystemResourcesMemoryFreeMB,
26+
MemoryFreePercentage: defaultSystemResourcesMemoryFreePercentage,
27+
}
28+
}

system_linux.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// +build linux
2+
3+
package taskq
4+
5+
import (
6+
"runtime"
7+
8+
"github.com/capnm/sysinfo"
9+
"github.com/vmihailenco/taskq/v3/internal"
10+
)
11+
12+
func hasFreeSystemResources(sr SystemResources) bool {
13+
si := sysinfo.Get()
14+
free := si.FreeRam + si.BufferRam
15+
16+
if sr.Load1PerCPU > 0 && si.Loads[0] > sr.Load1PerCPU*float64(runtime.NumCPU()) {
17+
internal.Logger.Println("taskq: consumer memory is lower than required")
18+
return false
19+
}
20+
21+
if sr.MemoryFreeMB > 0 && free < sr.MemoryFreeMB {
22+
internal.Logger.Println("taskq: consumer memory is lower than required")
23+
return false
24+
}
25+
26+
if sr.MemoryFreePercentage > 0 && free/si.TotalRam < sr.MemoryFreePercentage/100 {
27+
internal.Logger.Println("taskq: consumer memory is lower than required")
28+
return false
29+
}
30+
31+
return true
32+
}

system_linux_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package taskq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestHasFreeSystemResources(t *testing.T) {
10+
// TODO: Manage to mock capnm/sysinfo
11+
assert.True(t, hasFreeSystemResources(SystemResources{}))
12+
}

system_other.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// +build !linux
2+
3+
package taskq
4+
5+
func hasFreeSystemResources(_ SystemResources) bool {
6+
return true
7+
}

system_other_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package taskq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestHasFreeSystemResources(t *testing.T) {
10+
assert.True(t, hasFreeSystemResources(SystemResources{}))
11+
}

system_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package taskq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestNewDefaultSystemResources(t *testing.T) {
10+
expectedValue := SystemResources{
11+
Load1PerCPU: 1.5,
12+
MemoryFreeMB: 2e5,
13+
MemoryFreePercentage: 5,
14+
}
15+
assert.Equal(t, expectedValue, NewDefaultSystemResources())
16+
}

0 commit comments

Comments
 (0)