Skip to content

Commit 96d34e8

Browse files
authored
chore: Port loki.source.heroku to use server abstraction (#5962)
### Pull Request Details Followup to #5895. Port `loki.source.heroku` to use the shared server abstraction. This will also make it so that this server don't require a restart if labels, relabel rules or timestamp config changes. ### Issue(s) fixed by this Pull Request Part of #5803 ### Notes to the Reviewer Changes so that each component pass the `EntriesWritten` counter since they unfortunately have different naming conventions.. ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [x] Tests updated - [ ] Config converters updated
1 parent 8deede0 commit 96d34e8

File tree

12 files changed

+449
-798
lines changed

12 files changed

+449
-798
lines changed

internal/component/loki/source/api/api.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func (a *Arguments) labelSet() model.LabelSet {
5656

5757
type Component struct {
5858
opts component.Options
59+
metrics *metrics
5960
handler loki.LogsBatchReceiver
6061
uncheckedCollector *util.UncheckedCollector
6162

@@ -68,6 +69,7 @@ type Component struct {
6869
func New(opts component.Options, args Arguments) (*Component, error) {
6970
c := &Component{
7071
opts: opts,
72+
metrics: newMetrics(opts.Registerer),
7173
handler: loki.NewLogsBatchReceiver(),
7274
uncheckedCollector: util.NewUncheckedCollector(nil),
7375

@@ -134,8 +136,9 @@ func (c *Component) Update(args component.Arguments) error {
134136

135137
var err error
136138
c.server, err = source.NewServer(c.opts.Logger, reg, c.handler, source.ServerConfig{
137-
Namespace: "loki_source_api",
138-
NetConfig: newArgs.Server,
139+
Namespace: "loki_source_api",
140+
EntriesWritten: c.metrics.entriesWritten,
141+
NetConfig: newArgs.Server,
139142
LogsConfig: &source.LogsConfig{
140143
FixedLabels: newArgs.labelSet(),
141144
RelabelRules: relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package api
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
6+
"github.com/grafana/alloy/internal/util"
7+
)
8+
9+
type metrics struct {
10+
entriesWritten prometheus.Counter
11+
}
12+
13+
func newMetrics(reg prometheus.Registerer) *metrics {
14+
return &metrics{
15+
entriesWritten: util.MustRegisterOrGet(reg, prometheus.NewCounter(prometheus.CounterOpts{
16+
Name: "loki_source_api_entries_written",
17+
Help: "Total number of entries written.",
18+
})).(prometheus.Counter),
19+
}
20+
}

internal/component/loki/source/heroku/heroku.go

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,18 @@ package heroku
33
import (
44
"context"
55
"fmt"
6-
"reflect"
76
"sync"
87

98
"github.com/grafana/alloy/internal/component"
109
"github.com/grafana/alloy/internal/component/common/loki"
1110
fnet "github.com/grafana/alloy/internal/component/common/net"
1211
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
13-
ht "github.com/grafana/alloy/internal/component/loki/source/heroku/internal/herokutarget"
12+
"github.com/grafana/alloy/internal/component/loki/source"
1413
"github.com/grafana/alloy/internal/featuregate"
1514
"github.com/grafana/alloy/internal/runtime/logging/level"
1615
"github.com/grafana/alloy/internal/util"
1716
"github.com/prometheus/client_golang/prometheus"
1817
"github.com/prometheus/common/model"
19-
"github.com/prometheus/prometheus/model/relabel"
2018
)
2119

2220
func init() {
@@ -51,14 +49,14 @@ func (a *Arguments) SetToDefault() {
5149
// Component implements the loki.source.heroku component.
5250
type Component struct {
5351
opts component.Options
54-
metrics *ht.Metrics // Metrics about Heroku entries.
52+
metrics *metrics // Metrics about Heroku entries.
5553
serverMetrics *util.UncheckedCollector // Metircs about the HTTP server managed by the component.
5654

5755
mut sync.RWMutex
5856
args Arguments
5957

6058
fanout *loki.Fanout
61-
server *ht.HerokuServer
59+
server *source.Server
6260

6361
handler loki.LogsBatchReceiver
6462
}
@@ -67,7 +65,7 @@ type Component struct {
6765
func New(o component.Options, args Arguments) (*Component, error) {
6866
c := &Component{
6967
opts: o,
70-
metrics: ht.NewMetrics(o.Registerer),
68+
metrics: newMetrics(o.Registerer),
7169
mut: sync.RWMutex{},
7270
args: Arguments{},
7371
fanout: loki.NewFanout(args.ForwardTo),
@@ -107,19 +105,14 @@ func (c *Component) Update(args component.Arguments) error {
107105
defer c.mut.Unlock()
108106

109107
newArgs := args.(Arguments)
108+
110109
c.fanout.UpdateChildren(newArgs.ForwardTo)
111110

112-
var rcs []*relabel.Config
113-
if len(newArgs.RelabelRules) > 0 {
114-
rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
111+
if newArgs.Server == nil {
112+
newArgs.Server = &fnet.ServerConfig{}
115113
}
116114

117-
restartRequired := changed(c.args.Server, newArgs.Server) ||
118-
changed(c.args.RelabelRules, newArgs.RelabelRules) ||
119-
changed(c.args.Labels, newArgs.Labels) ||
120-
c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp
121-
122-
if restartRequired {
115+
if c.server.NeedsRestart(newArgs.Server) {
123116
if c.server != nil {
124117
c.server.Shutdown()
125118
}
@@ -131,54 +124,67 @@ func (c *Component) Update(args component.Arguments) error {
131124
registry := prometheus.NewRegistry()
132125
c.serverMetrics.SetCollector(registry)
133126

134-
server, err := ht.NewHerokuServer(c.metrics, c.opts.Logger, c.handler, rcs, newArgs.Convert(), registry)
127+
server, err := source.NewServer(c.opts.Logger, registry, c.handler, source.ServerConfig{
128+
Namespace: "loki_source_heroku_drain_target",
129+
EntriesWritten: c.metrics.entriesWritten,
130+
NetConfig: newArgs.Server,
131+
LogsConfig: &source.LogsConfig{
132+
FixedLabels: newArgs.labelSet(),
133+
RelabelRules: alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
134+
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
135+
},
136+
})
137+
135138
if err != nil {
136139
return fmt.Errorf("failed to create heroku server: %w", err)
137140
}
138141

139-
if err := server.Run(); err != nil {
142+
if err := server.Run(newRoutes(c.opts.Logger, c.metrics), []source.HandlerRoute{newHealthyHandler()}); err != nil {
140143
return fmt.Errorf("failed to run heroku server: %w", err)
141144
}
142145

143146
c.server = server
144-
c.args = newArgs
147+
return nil
148+
}
149+
150+
if c.server != nil {
151+
c.server.Update(&source.LogsConfig{
152+
FixedLabels: newArgs.labelSet(),
153+
RelabelRules: alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
154+
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
155+
})
145156
}
146157

158+
c.args = newArgs
159+
147160
return nil
148161
}
149162

150-
// Convert is used to bridge between the Alloy and Promtail types.
151-
func (args *Arguments) Convert() *ht.HerokuConfig {
163+
func (args *Arguments) labelSet() model.LabelSet {
152164
lbls := make(model.LabelSet, len(args.Labels))
153165
for k, v := range args.Labels {
154166
lbls[model.LabelName(k)] = model.LabelValue(v)
155167
}
156168

157-
return &ht.HerokuConfig{
158-
Server: args.Server,
159-
Labels: lbls,
160-
UseIncomingTimestamp: args.UseIncomingTimestamp,
161-
}
169+
return lbls
162170
}
163171

164172
// DebugInfo returns information about the status of listener.
165173
func (c *Component) DebugInfo() any {
166174
c.mut.RLock()
167175
defer c.mut.RUnlock()
168176

169-
var res = readerDebugInfo{
170-
Ready: c.server.Ready(),
171-
Address: c.server.HTTPListenAddress(),
177+
if c.server == nil {
178+
return readerDebugInfo{}
172179
}
173180

174-
return res
181+
return readerDebugInfo{
182+
Ready: ready(c.server.HTTPAddr()),
183+
Address: c.server.HTTPAddr(),
184+
}
175185
}
176186

177187
type readerDebugInfo struct {
178188
Ready bool `alloy:"ready,attr"`
179189
Address string `alloy:"address,attr"`
180190
}
181-
182-
func changed(prev, next any) bool {
183-
return !reflect.DeepEqual(prev, next)
184-
}

internal/component/loki/source/heroku/heroku_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/grafana/alloy/internal/component/common/loki"
1313
fnet "github.com/grafana/alloy/internal/component/common/net"
1414
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
15-
"github.com/grafana/alloy/internal/component/loki/source/heroku/internal/herokutarget"
15+
"github.com/grafana/alloy/internal/component/loki/source"
1616
"github.com/grafana/alloy/internal/util"
1717
"github.com/grafana/regexp"
1818
"github.com/prometheus/client_golang/prometheus"
@@ -97,25 +97,25 @@ func TestUpdate_detectsWhenTargetRequiresARestart(t *testing.T) {
9797
restartRequired: false,
9898
},
9999
{
100-
name: "change in labels requires server restart",
100+
name: "change in labels does not require server restart",
101101
mutateNewArgs: func(_ *testing.T, args *Arguments) {
102102
args.Labels = map[string]string{"some": "label"}
103103
},
104-
restartRequired: true,
104+
restartRequired: false,
105105
},
106106
{
107-
name: "change in relabel rules requires server restart",
107+
name: "change in relabel rules does not require server restart",
108108
mutateNewArgs: func(_ *testing.T, args *Arguments) {
109109
args.RelabelRules = alloy_relabel.Rules{}
110110
},
111-
restartRequired: true,
111+
restartRequired: false,
112112
},
113113
{
114-
name: "change in use incoming timestamp requires server restart",
114+
name: "change in use incoming timestamp does not require server restart",
115115
mutateNewArgs: func(_ *testing.T, args *Arguments) {
116116
args.UseIncomingTimestamp = !args.UseIncomingTimestamp
117117
},
118-
restartRequired: true,
118+
restartRequired: false,
119119
},
120120
}
121121
for _, tc := range tests {
@@ -253,7 +253,7 @@ func waitForServerToBeReady(t *testing.T, comp *Component) {
253253
require.Eventuallyf(t, func() bool {
254254
resp, err := http.Get(fmt.Sprintf(
255255
"http://%v/wrong/url",
256-
comp.server.HTTPListenAddress(),
256+
comp.server.HTTPAddr(),
257257
))
258258
return err == nil && resp.StatusCode == 404
259259
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
@@ -275,6 +275,6 @@ func newRegexp() alloy_relabel.Regexp {
275275
return alloy_relabel.Regexp{Regexp: re}
276276
}
277277

278-
func getEndpoint(target *herokutarget.HerokuServer) string {
279-
return fmt.Sprintf("http://%s%s", target.HTTPListenAddress(), target.DrainEndpoint())
278+
func getEndpoint(target *source.Server) string {
279+
return fmt.Sprintf("http://%s%s", target.HTTPAddr(), pathDrain)
280280
}

internal/component/loki/source/heroku/internal/herokutarget/metrics.go

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)