Skip to content

Commit ced40e2

Browse files
committed
feat: configurable provider channel buffer length
This change makes the memory provider channel buffer length configurable. The previously hard-coded value of 200 is used as default. This should result in better performance and less timeouts during sudden alert spikes, specially if the pipeline speed is affected by slow receivers. Signed-off-by: Siavash Safi <[email protected]>
1 parent 7980594 commit ced40e2

File tree

5 files changed

+49
-37
lines changed

5 files changed

+49
-37
lines changed

cmd/alertmanager/main.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,14 @@ func run() int {
143143
}
144144

145145
var (
146-
configFile = kingpin.Flag("config.file", "Alertmanager configuration file name.").Default("alertmanager.yml").String()
147-
dataDir = kingpin.Flag("storage.path", "Base path for data storage.").Default("data/").String()
148-
retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration()
149-
maintenanceInterval = kingpin.Flag("data.maintenance-interval", "Interval between garbage collection and snapshotting to disk of the silences and the notification logs.").Default("15m").Duration()
150-
maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int()
151-
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
152-
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
146+
configFile = kingpin.Flag("config.file", "Alertmanager configuration file name.").Default("alertmanager.yml").String()
147+
dataDir = kingpin.Flag("storage.path", "Base path for data storage.").Default("data/").String()
148+
retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration()
149+
maintenanceInterval = kingpin.Flag("data.maintenance-interval", "Interval between garbage collection and snapshotting to disk of the silences and the notification logs.").Default("15m").Duration()
150+
maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int()
151+
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
152+
alertChannelBufferLength = kingpin.Flag("alerts.channel-buffer-length", "Alert channel buffer length").Default("200").Int()
153+
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
153154

154155
webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093")
155156
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
@@ -342,7 +343,7 @@ func run() int {
342343
go peer.Settle(ctx, *gossipInterval*10)
343344
}
344345

345-
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer)
346+
alerts, err := mem.NewAlerts(context.Background(), marker, *alertChannelBufferLength, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer)
346347
if err != nil {
347348
logger.Error("error creating memory provider", "err", err)
348349
return 1

dispatch/dispatch_test.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ import (
3535
"github.com/prometheus/alertmanager/types"
3636
)
3737

38+
var (
39+
alertChannelLength = 200
40+
)
41+
3842
func TestAggrGroup(t *testing.T) {
3943
lset := model.LabelSet{
4044
"a": "v1",
@@ -391,7 +395,7 @@ route:
391395
logger := promslog.NewNopLogger()
392396
route := NewRoute(conf.Route, nil)
393397
marker := types.NewMarker(prometheus.NewRegistry())
394-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
398+
alerts, err := mem.NewAlerts(context.Background(), marker, alertChannelLength, time.Hour, nil, logger, nil)
395399
if err != nil {
396400
t.Fatal(err)
397401
}
@@ -541,7 +545,7 @@ route:
541545
logger := promslog.NewNopLogger()
542546
route := NewRoute(conf.Route, nil)
543547
marker := types.NewMarker(prometheus.NewRegistry())
544-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
548+
alerts, err := mem.NewAlerts(context.Background(), marker, alertChannelLength, time.Hour, nil, logger, nil)
545549
if err != nil {
546550
t.Fatal(err)
547551
}
@@ -662,7 +666,7 @@ func newAlert(labels model.LabelSet) *types.Alert {
662666
func TestDispatcherRace(t *testing.T) {
663667
logger := promslog.NewNopLogger()
664668
marker := types.NewMarker(prometheus.NewRegistry())
665-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
669+
alerts, err := mem.NewAlerts(context.Background(), marker, alertChannelLength, time.Hour, nil, logger, nil)
666670
if err != nil {
667671
t.Fatal(err)
668672
}
@@ -679,7 +683,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
679683

680684
logger := promslog.NewNopLogger()
681685
marker := types.NewMarker(prometheus.NewRegistry())
682-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
686+
alerts, err := mem.NewAlerts(context.Background(), marker, alertChannelLength, time.Hour, nil, logger, nil)
683687
if err != nil {
684688
t.Fatal(err)
685689
}
@@ -733,7 +737,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
733737
r := prometheus.NewRegistry()
734738
marker := types.NewMarker(r)
735739

736-
alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, promslog.NewNopLogger(), nil)
740+
alerts, err := mem.NewAlerts(context.Background(), marker, alertChannelLength, time.Minute, nil, promslog.NewNopLogger(), nil)
737741
if err != nil {
738742
t.Fatal(err)
739743
}

inhibit/inhibit_bench_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func lastRuleMatchesBenchmark(b *testing.B, n int) benchmarkOptions {
184184
func benchmarkMutes(b *testing.B, opts benchmarkOptions) {
185185
r := prometheus.NewRegistry()
186186
m := types.NewMarker(r)
187-
s, err := mem.NewAlerts(context.TODO(), m, time.Minute, nil, promslog.NewNopLogger(), r)
187+
s, err := mem.NewAlerts(context.TODO(), m, 200, time.Minute, nil, promslog.NewNopLogger(), r)
188188
if err != nil {
189189
b.Fatal(err)
190190
}

provider/mem/mem.go

+19-14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package mem
1515

1616
import (
1717
"context"
18+
"fmt"
1819
"log/slog"
1920
"sync"
2021
"time"
@@ -27,8 +28,6 @@ import (
2728
"github.com/prometheus/alertmanager/types"
2829
)
2930

30-
const alertChannelLength = 200
31-
3231
// Alerts gives access to a set of alerts. All methods are goroutine-safe.
3332
type Alerts struct {
3433
cancel context.CancelFunc
@@ -38,8 +37,9 @@ type Alerts struct {
3837
alerts *store.Alerts
3938
marker types.AlertMarker
4039

41-
listeners map[int]listeningAlerts
42-
next int
40+
channelLength int
41+
listeners map[int]listeningAlerts
42+
next int
4343

4444
callback AlertStoreCallback
4545

@@ -85,20 +85,25 @@ func (a *Alerts) registerMetrics(r prometheus.Registerer) {
8585
}
8686

8787
// NewAlerts returns a new alert provider.
88-
func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duration, alertCallback AlertStoreCallback, l *slog.Logger, r prometheus.Registerer) (*Alerts, error) {
88+
func NewAlerts(ctx context.Context, m types.AlertMarker, channelLength int, intervalGC time.Duration, alertCallback AlertStoreCallback, l *slog.Logger, r prometheus.Registerer) (*Alerts, error) {
8989
if alertCallback == nil {
9090
alertCallback = noopCallback{}
9191
}
9292

93+
if channelLength < 1 {
94+
return nil, fmt.Errorf("channel length has to be > zero, provided %d", channelLength)
95+
}
96+
9397
ctx, cancel := context.WithCancel(ctx)
9498
a := &Alerts{
95-
marker: m,
96-
alerts: store.NewAlerts(),
97-
cancel: cancel,
98-
listeners: map[int]listeningAlerts{},
99-
next: 0,
100-
logger: l.With("component", "provider"),
101-
callback: alertCallback,
99+
marker: m,
100+
alerts: store.NewAlerts(),
101+
cancel: cancel,
102+
channelLength: channelLength,
103+
listeners: map[int]listeningAlerts{},
104+
next: 0,
105+
logger: l.With("component", "provider"),
106+
callback: alertCallback,
102107
}
103108

104109
if r != nil {
@@ -170,7 +175,7 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
170175
var (
171176
done = make(chan struct{})
172177
alerts = a.alerts.List()
173-
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
178+
ch = make(chan *types.Alert, max(len(alerts), a.channelLength))
174179
)
175180

176181
for _, a := range alerts {
@@ -187,7 +192,7 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
187192
// pending notifications.
188193
func (a *Alerts) GetPending() provider.AlertIterator {
189194
var (
190-
ch = make(chan *types.Alert, alertChannelLength)
195+
ch = make(chan *types.Alert, a.channelLength)
191196
done = make(chan struct{})
192197
)
193198
a.mtx.Lock()

provider/mem/mem_test.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import (
3535
)
3636

3737
var (
38+
alertChannelLength = 200
39+
3840
t0 = time.Now()
3941
t1 = t0.Add(100 * time.Millisecond)
4042

@@ -87,7 +89,7 @@ func init() {
8789
// a listener can not unsubscribe as the lock is hold by `alerts.Lock`.
8890
func TestAlertsSubscribePutStarvation(t *testing.T) {
8991
marker := types.NewMarker(prometheus.NewRegistry())
90-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
92+
alerts, err := NewAlerts(context.Background(), marker, alertChannelLength, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
9193
if err != nil {
9294
t.Fatal(err)
9395
}
@@ -142,7 +144,7 @@ func TestDeadLock(t *testing.T) {
142144

143145
marker := types.NewMarker(prometheus.NewRegistry())
144146
// Run gc every 5 milliseconds to increase the possibility of a deadlock with Subscribe()
145-
alerts, err := NewAlerts(context.Background(), marker, 5*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), nil)
147+
alerts, err := NewAlerts(context.Background(), marker, alertChannelLength, 5*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), nil)
146148
if err != nil {
147149
t.Fatal(err)
148150
}
@@ -195,7 +197,7 @@ func TestDeadLock(t *testing.T) {
195197

196198
func TestAlertsPut(t *testing.T) {
197199
marker := types.NewMarker(prometheus.NewRegistry())
198-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
200+
alerts, err := NewAlerts(context.Background(), marker, alertChannelLength, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
199201
if err != nil {
200202
t.Fatal(err)
201203
}
@@ -223,7 +225,7 @@ func TestAlertsSubscribe(t *testing.T) {
223225

224226
ctx, cancel := context.WithCancel(context.Background())
225227
defer cancel()
226-
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
228+
alerts, err := NewAlerts(ctx, marker, alertChannelLength, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
227229
if err != nil {
228230
t.Fatal(err)
229231
}
@@ -300,7 +302,7 @@ func TestAlertsSubscribe(t *testing.T) {
300302

301303
func TestAlertsGetPending(t *testing.T) {
302304
marker := types.NewMarker(prometheus.NewRegistry())
303-
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
305+
alerts, err := NewAlerts(context.Background(), marker, alertChannelLength, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
304306
if err != nil {
305307
t.Fatal(err)
306308
}
@@ -343,7 +345,7 @@ func TestAlertsGetPending(t *testing.T) {
343345

344346
func TestAlertsGC(t *testing.T) {
345347
marker := types.NewMarker(prometheus.NewRegistry())
346-
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), nil)
348+
alerts, err := NewAlerts(context.Background(), marker, alertChannelLength, 200*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), nil)
347349
if err != nil {
348350
t.Fatal(err)
349351
}
@@ -380,7 +382,7 @@ func TestAlertsStoreCallback(t *testing.T) {
380382
cb := &limitCountCallback{limit: 3}
381383

382384
marker := types.NewMarker(prometheus.NewRegistry())
383-
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, cb, promslog.NewNopLogger(), nil)
385+
alerts, err := NewAlerts(context.Background(), marker, alertChannelLength, 200*time.Millisecond, cb, promslog.NewNopLogger(), nil)
384386
if err != nil {
385387
t.Fatal(err)
386388
}
@@ -443,7 +445,7 @@ func TestAlertsStoreCallback(t *testing.T) {
443445

444446
func TestAlerts_Count(t *testing.T) {
445447
marker := types.NewMarker(prometheus.NewRegistry())
446-
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, nil, promslog.NewNopLogger(), nil)
448+
alerts, err := NewAlerts(context.Background(), marker, alertChannelLength, 200*time.Millisecond, nil, promslog.NewNopLogger(), nil)
447449
require.NoError(t, err)
448450

449451
states := []types.AlertState{types.AlertStateActive, types.AlertStateSuppressed, types.AlertStateUnprocessed}
@@ -565,7 +567,7 @@ func (l *limitCountCallback) PostDelete(_ *types.Alert) {
565567

566568
func TestAlertsConcurrently(t *testing.T) {
567569
callback := &limitCountCallback{limit: 100}
568-
a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, promslog.NewNopLogger(), nil)
570+
a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), alertChannelLength, time.Millisecond, callback, promslog.NewNopLogger(), nil)
569571
require.NoError(t, err)
570572

571573
stopc := make(chan struct{})

0 commit comments

Comments
 (0)