Skip to content

Commit a6d3254

Browse files
committed
Pass entriesWritten metric
1 parent 26c67b3 commit a6d3254

4 files changed

Lines changed: 43 additions & 37 deletions

File tree

internal/component/loki/source/api/api.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func (a *Arguments) labelSet() model.LabelSet {
5656

5757
type Component struct {
5858
opts component.Options
59+
metrics *metrics
5960
handler loki.LogsBatchReceiver
6061
uncheckedCollector *util.UncheckedCollector
6162

@@ -68,6 +69,7 @@ type Component struct {
6869
func New(opts component.Options, args Arguments) (*Component, error) {
6970
c := &Component{
7071
opts: opts,
72+
metrics: newMetrics(opts.Registerer),
7173
handler: loki.NewLogsBatchReceiver(),
7274
uncheckedCollector: util.NewUncheckedCollector(nil),
7375

@@ -134,8 +136,9 @@ func (c *Component) Update(args component.Arguments) error {
134136

135137
var err error
136138
c.server, err = source.NewServer(c.opts.Logger, reg, c.handler, source.ServerConfig{
137-
Namespace: "loki_source_api",
138-
NetConfig: newArgs.Server,
139+
Namespace: "loki_source_api",
140+
EntriesWritten: c.metrics.entriesWritten,
141+
NetConfig: newArgs.Server,
139142
LogsConfig: &source.LogsConfig{
140143
FixedLabels: newArgs.labelSet(),
141144
RelabelRules: relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package api
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
6+
"github.com/grafana/alloy/internal/util"
7+
)
8+
9+
type metrics struct {
10+
entriesWritten prometheus.Counter
11+
}
12+
13+
func newMetrics(reg prometheus.Registerer) *metrics {
14+
return &metrics{
15+
entriesWritten: util.MustRegisterOrGet(reg, prometheus.NewCounter(prometheus.CounterOpts{
16+
Name: "loki_source_api_entries_written",
17+
Help: "Total number of entries written.",
18+
})).(prometheus.Counter),
19+
}
20+
}

internal/component/loki/source/server.go

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@ import (
1414
"github.com/grafana/alloy/internal/component/common/loki"
1515
fnet "github.com/grafana/alloy/internal/component/common/net"
1616
"github.com/grafana/alloy/internal/runtime/logging/level"
17-
"github.com/grafana/alloy/internal/util"
1817
)
1918

2019
// Server exposes HTTP routes that ingest log entries and forward them in batches.
2120
type Server struct {
22-
logger log.Logger
23-
metrics *serverMetrics
21+
logger log.Logger
22+
entriesWritten prometheus.Counter
2423

2524
server *fnet.TargetServer
2625
netConfig *fnet.ServerConfig
@@ -56,9 +55,10 @@ type HandlerRoute interface {
5655
}
5756

5857
type ServerConfig struct {
59-
Namespace string
60-
NetConfig *fnet.ServerConfig
61-
LogsConfig *LogsConfig
58+
Namespace string
59+
EntriesWritten prometheus.Counter
60+
NetConfig *fnet.ServerConfig
61+
LogsConfig *LogsConfig
6262
}
6363

6464
type LogsConfig struct {
@@ -74,14 +74,14 @@ func NewServer(logger log.Logger, reg prometheus.Registerer, recv loki.LogsBatch
7474
}
7575

7676
return &Server{
77-
logger: logger,
78-
metrics: newServerMetrics(cfg.Namespace, reg),
79-
server: server,
80-
netConfig: cfg.NetConfig,
81-
logsConfig: cfg.LogsConfig,
82-
recv: recv,
83-
once: sync.Once{},
84-
forceShutdown: make(chan struct{}),
77+
logger: logger,
78+
entriesWritten: cfg.EntriesWritten,
79+
server: server,
80+
netConfig: cfg.NetConfig,
81+
logsConfig: cfg.LogsConfig,
82+
recv: recv,
83+
once: sync.Once{},
84+
forceShutdown: make(chan struct{}),
8585
}, nil
8686
}
8787

@@ -167,7 +167,7 @@ func (s *Server) logsHandler(logsFn func(r *http.Request, opts *LogsConfig) ([]l
167167
return
168168
}
169169

170-
s.metrics.entriesWritten.Add(float64(numEntries))
170+
s.entriesWritten.Add(float64(numEntries))
171171

172172
if err != nil {
173173
level.Warn(s.logger).Log("msg", "at least one entry failed to be processed", "err", err)
@@ -179,21 +179,3 @@ func (s *Server) logsHandler(logsFn func(r *http.Request, opts *LogsConfig) ([]l
179179
w.WriteHeader(status)
180180
})
181181
}
182-
183-
type serverMetrics struct {
184-
entriesWritten prometheus.Counter
185-
}
186-
187-
func newServerMetrics(namespace string, reg prometheus.Registerer) *serverMetrics {
188-
m := &serverMetrics{
189-
entriesWritten: prometheus.NewCounter(prometheus.CounterOpts{
190-
Namespace: namespace,
191-
Name: "entries_written",
192-
Help: "Total number of entries written.",
193-
}),
194-
}
195-
196-
m.entriesWritten = util.MustRegisterOrGet(reg, m.entriesWritten).(prometheus.Counter)
197-
198-
return m
199-
}

internal/component/loki/source/server_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,9 @@ func newTestServer(t *testing.T, recv loki.LogsBatchReceiver, cfg ServerConfig,
351351

352352
func testServerConfig(timeout time.Duration, logsConfig *LogsConfig) ServerConfig {
353353
return ServerConfig{
354-
Namespace: "test",
355-
LogsConfig: logsConfig,
354+
Namespace: "test",
355+
EntriesWritten: prometheus.NewCounter(prometheus.CounterOpts{}),
356+
LogsConfig: logsConfig,
356357
NetConfig: &fnet.ServerConfig{
357358
HTTP: &fnet.HTTPConfig{
358359
ListenAddress: "127.0.0.1",

0 commit comments

Comments
 (0)