Skip to content

Commit e43d0d5

Browse files
committed
Port heroku to use server abstraction
1 parent a6d3254 commit e43d0d5

8 files changed

Lines changed: 405 additions & 761 deletions

File tree

internal/component/loki/source/heroku/heroku.go

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,18 @@ package heroku
33
import (
44
"context"
55
"fmt"
6-
"reflect"
76
"sync"
87

98
"github.com/grafana/alloy/internal/component"
109
"github.com/grafana/alloy/internal/component/common/loki"
1110
fnet "github.com/grafana/alloy/internal/component/common/net"
1211
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
13-
ht "github.com/grafana/alloy/internal/component/loki/source/heroku/internal/herokutarget"
12+
"github.com/grafana/alloy/internal/component/loki/source"
1413
"github.com/grafana/alloy/internal/featuregate"
1514
"github.com/grafana/alloy/internal/runtime/logging/level"
1615
"github.com/grafana/alloy/internal/util"
1716
"github.com/prometheus/client_golang/prometheus"
1817
"github.com/prometheus/common/model"
19-
"github.com/prometheus/prometheus/model/relabel"
2018
)
2119

2220
func init() {
@@ -51,14 +49,14 @@ func (a *Arguments) SetToDefault() {
5149
// Component implements the loki.source.heroku component.
5250
type Component struct {
5351
opts component.Options
54-
metrics *ht.Metrics // Metrics about Heroku entries.
52+
metrics *metrics // Metrics about Heroku entries.
5553
serverMetrics *util.UncheckedCollector // Metircs about the HTTP server managed by the component.
5654

5755
mut sync.RWMutex
5856
args Arguments
5957

6058
fanout *loki.Fanout
61-
server *ht.HerokuServer
59+
server *source.Server
6260

6361
handler loki.LogsBatchReceiver
6462
}
@@ -67,7 +65,7 @@ type Component struct {
6765
func New(o component.Options, args Arguments) (*Component, error) {
6866
c := &Component{
6967
opts: o,
70-
metrics: ht.NewMetrics(o.Registerer),
68+
metrics: newMetrics(o.Registerer),
7169
mut: sync.RWMutex{},
7270
args: Arguments{},
7371
fanout: loki.NewFanout(args.ForwardTo),
@@ -107,19 +105,14 @@ func (c *Component) Update(args component.Arguments) error {
107105
defer c.mut.Unlock()
108106

109107
newArgs := args.(Arguments)
108+
110109
c.fanout.UpdateChildren(newArgs.ForwardTo)
111110

112-
var rcs []*relabel.Config
113-
if len(newArgs.RelabelRules) > 0 {
114-
rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
111+
if newArgs.Server == nil {
112+
newArgs.Server = &fnet.ServerConfig{}
115113
}
116114

117-
restartRequired := changed(c.args.Server, newArgs.Server) ||
118-
changed(c.args.RelabelRules, newArgs.RelabelRules) ||
119-
changed(c.args.Labels, newArgs.Labels) ||
120-
c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp
121-
122-
if restartRequired {
115+
if c.server.NeedsRestart(newArgs.Server) {
123116
if c.server != nil {
124117
c.server.Shutdown()
125118
}
@@ -131,54 +124,66 @@ func (c *Component) Update(args component.Arguments) error {
131124
registry := prometheus.NewRegistry()
132125
c.serverMetrics.SetCollector(registry)
133126

134-
server, err := ht.NewHerokuServer(c.metrics, c.opts.Logger, c.handler, rcs, newArgs.Convert(), registry)
127+
server, err := source.NewServer(c.opts.Logger, registry, c.handler, source.ServerConfig{
128+
Namespace: "loki_source_heroku_drain_target",
129+
NetConfig: newArgs.Server,
130+
LogsConfig: &source.LogsConfig{
131+
FixedLabels: newArgs.labelSet(),
132+
RelabelRules: alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
133+
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
134+
},
135+
})
136+
135137
if err != nil {
136138
return fmt.Errorf("failed to create heroku server: %w", err)
137139
}
138140

139-
if err := server.Run(); err != nil {
141+
if err := server.Run(newRoutes(c.opts.Logger, c.metrics), []source.HandlerRoute{newHealthyHandler()}); err != nil {
140142
return fmt.Errorf("failed to run heroku server: %w", err)
141143
}
142144

143145
c.server = server
144-
c.args = newArgs
146+
return nil
147+
}
148+
149+
if c.server != nil {
150+
c.server.Update(&source.LogsConfig{
151+
FixedLabels: newArgs.labelSet(),
152+
RelabelRules: alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
153+
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
154+
})
145155
}
146156

157+
c.args = newArgs
158+
147159
return nil
148160
}
149161

150-
// Convert is used to bridge between the Alloy and Promtail types.
151-
func (args *Arguments) Convert() *ht.HerokuConfig {
162+
func (args *Arguments) labelSet() model.LabelSet {
152163
lbls := make(model.LabelSet, len(args.Labels))
153164
for k, v := range args.Labels {
154165
lbls[model.LabelName(k)] = model.LabelValue(v)
155166
}
156167

157-
return &ht.HerokuConfig{
158-
Server: args.Server,
159-
Labels: lbls,
160-
UseIncomingTimestamp: args.UseIncomingTimestamp,
161-
}
168+
return lbls
162169
}
163170

164171
// DebugInfo returns information about the status of listener.
165172
func (c *Component) DebugInfo() any {
166173
c.mut.RLock()
167174
defer c.mut.RUnlock()
168175

169-
var res = readerDebugInfo{
170-
Ready: c.server.Ready(),
171-
Address: c.server.HTTPListenAddress(),
176+
if c.server == nil {
177+
return readerDebugInfo{}
172178
}
173179

174-
return res
180+
return readerDebugInfo{
181+
Ready: ready(c.server.HTTPAddr()),
182+
Address: c.server.HTTPAddr(),
183+
}
175184
}
176185

177186
type readerDebugInfo struct {
178187
Ready bool `alloy:"ready,attr"`
179188
Address string `alloy:"address,attr"`
180189
}
181-
182-
func changed(prev, next any) bool {
183-
return !reflect.DeepEqual(prev, next)
184-
}

internal/component/loki/source/heroku/heroku_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/grafana/alloy/internal/component/common/loki"
1313
fnet "github.com/grafana/alloy/internal/component/common/net"
1414
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
15-
"github.com/grafana/alloy/internal/component/loki/source/heroku/internal/herokutarget"
15+
"github.com/grafana/alloy/internal/component/loki/source"
1616
"github.com/grafana/alloy/internal/util"
1717
"github.com/grafana/regexp"
1818
"github.com/prometheus/client_golang/prometheus"
@@ -97,25 +97,25 @@ func TestUpdate_detectsWhenTargetRequiresARestart(t *testing.T) {
9797
restartRequired: false,
9898
},
9999
{
100-
name: "change in labels requires server restart",
100+
name: "change in labels does not require server restart",
101101
mutateNewArgs: func(_ *testing.T, args *Arguments) {
102102
args.Labels = map[string]string{"some": "label"}
103103
},
104-
restartRequired: true,
104+
restartRequired: false,
105105
},
106106
{
107-
name: "change in relabel rules requires server restart",
107+
name: "change in relabel rules does not require server restart",
108108
mutateNewArgs: func(_ *testing.T, args *Arguments) {
109109
args.RelabelRules = alloy_relabel.Rules{}
110110
},
111-
restartRequired: true,
111+
restartRequired: false,
112112
},
113113
{
114-
name: "change in use incoming timestamp requires server restart",
114+
name: "change in use incoming timestamp does not require server restart",
115115
mutateNewArgs: func(_ *testing.T, args *Arguments) {
116116
args.UseIncomingTimestamp = !args.UseIncomingTimestamp
117117
},
118-
restartRequired: true,
118+
restartRequired: false,
119119
},
120120
}
121121
for _, tc := range tests {
@@ -253,7 +253,7 @@ func waitForServerToBeReady(t *testing.T, comp *Component) {
253253
require.Eventuallyf(t, func() bool {
254254
resp, err := http.Get(fmt.Sprintf(
255255
"http://%v/wrong/url",
256-
comp.server.HTTPListenAddress(),
256+
comp.server.HTTPAddr(),
257257
))
258258
return err == nil && resp.StatusCode == 404
259259
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
@@ -275,6 +275,6 @@ func newRegexp() alloy_relabel.Regexp {
275275
return alloy_relabel.Regexp{Regexp: re}
276276
}
277277

278-
func getEndpoint(target *herokutarget.HerokuServer) string {
279-
return fmt.Sprintf("http://%s%s", target.HTTPListenAddress(), target.DrainEndpoint())
278+
func getEndpoint(target *source.Server) string {
279+
return fmt.Sprintf("http://%s%s", target.HTTPAddr(), pathDrain)
280280
}

internal/component/loki/source/heroku/internal/herokutarget/metrics.go

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)