Skip to content

Commit 0f3bfe9

Browse files
kalleepCopilotx1unix
authored
chore: Add server abstration for loki source components (#5895)
### Pull Request Details After a couple of pr:s our loki source components that exposes http server are almost identical except for how entries are parsed. So I created an abstraction around http server for loki sources and ported over `loki.source.api` to it. I moved all life-cycle related test to the server and moved/re-wrote handler specific tests to `routes_test.go`. ### Issue(s) fixed by this Pull Request Part of: #5803 ### Notes to the Reviewer After this I will refactor the other components we have to use the same abstration. ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [x] Tests updated - [ ] Config converters updated --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Denys Sedchenko <9203548+x1unix@users.noreply.github.com>
1 parent 03ca563 commit 0f3bfe9

9 files changed

Lines changed: 1058 additions & 902 deletions

File tree

internal/component/loki/source/api/api.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package api
33
import (
44
"context"
55
"fmt"
6-
"reflect"
76
"sync"
87

98
"github.com/alecthomas/units"
@@ -14,7 +13,7 @@ import (
1413
"github.com/grafana/alloy/internal/component/common/loki"
1514
fnet "github.com/grafana/alloy/internal/component/common/net"
1615
"github.com/grafana/alloy/internal/component/common/relabel"
17-
"github.com/grafana/alloy/internal/component/loki/source/api/internal/lokipush"
16+
"github.com/grafana/alloy/internal/component/loki/source"
1817
"github.com/grafana/alloy/internal/featuregate"
1918
"github.com/grafana/alloy/internal/util"
2019
)
@@ -61,7 +60,7 @@ type Component struct {
6160
uncheckedCollector *util.UncheckedCollector
6261

6362
serverMut sync.Mutex
64-
server *lokipush.PushAPIServer
63+
server *source.Server
6564

6665
fanout *loki.Fanout
6766
}
@@ -120,8 +119,8 @@ func (c *Component) Update(args component.Arguments) error {
120119

121120
c.serverMut.Lock()
122121
defer c.serverMut.Unlock()
123-
serverNeedsRestarting := c.server == nil || !reflect.DeepEqual(c.server.ServerConfig(), *newArgs.Server)
124-
if serverNeedsRestarting {
122+
123+
if c.server.NeedsRestart(newArgs.Server) {
125124
if c.server != nil {
126125
c.server.Shutdown()
127126
}
@@ -130,23 +129,39 @@ func (c *Component) Update(args component.Arguments) error {
130129
// avoid issues with re-registering metrics with the same name, we create a
131130
// new registry for the server every time we create one, and pass it to an
132131
// unchecked collector to bypass uniqueness checking.
133-
serverRegistry := prometheus.NewRegistry()
134-
c.uncheckedCollector.SetCollector(serverRegistry)
132+
reg := prometheus.NewRegistry()
133+
c.uncheckedCollector.SetCollector(reg)
135134

136135
var err error
137-
c.server, err = lokipush.NewPushAPIServer(c.opts.Logger, newArgs.Server, c.handler, serverRegistry, int64(newArgs.MaxSendMessageSize))
136+
c.server, err = source.NewServer(c.opts.Logger, reg, c.handler, source.ServerConfig{
137+
Namespace: "loki_source_api",
138+
NetConfig: newArgs.Server,
139+
LogsConfig: &source.LogsConfig{
140+
FixedLabels: newArgs.labelSet(),
141+
RelabelRules: relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
142+
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
143+
},
144+
})
145+
138146
if err != nil {
139147
return fmt.Errorf("failed to create embedded server: %v", err)
140148
}
141149

142-
if err = c.server.Run(); err != nil {
150+
logsRoutes, handlerRoutes := newRoutes(int(newArgs.MaxSendMessageSize))
151+
if err = c.server.Run(logsRoutes, handlerRoutes); err != nil {
143152
return fmt.Errorf("failed to run embedded server: %v", err)
144153
}
154+
155+
return nil
145156
}
146157

147-
c.server.SetLabels(newArgs.labelSet())
148-
c.server.SetRelabelRules(newArgs.RelabelRules)
149-
c.server.SetKeepTimestamp(newArgs.UseIncomingTimestamp)
158+
if c.server != nil {
159+
c.server.Update(&source.LogsConfig{
160+
FixedLabels: newArgs.labelSet(),
161+
RelabelRules: relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
162+
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
163+
})
164+
}
150165

151166
return nil
152167
}

internal/component/loki/source/api/api_test.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
348348
assert.Equal(t, restarted, tc.restartRequired)
349349

350350
// in order to cleanly shutdown, we want to make sure the server is running first.
351-
waitForServerToBeReady(t, comp)
351+
waitForServerToBeReady(t, comp, tc.args.Server.HTTP.TLSConfig != nil)
352352
})
353353
}
354354
}
@@ -376,7 +376,7 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
376376
c := startTestComponent(t, opts, args, ctx)
377377

378378
// Create TLS-enabled Loki client
379-
lokiClient := newTestLokiClientTLS(t, c.server.HTTPListenAddress(), opts)
379+
lokiClient := newTestLokiClientTLS(t, c.server.HTTPAddr(), opts)
380380
defer lokiClient.Stop()
381381

382382
now := time.Now()
@@ -449,12 +449,12 @@ func TestDefaultServerConfig(t *testing.T) {
449449
require.NoError(t, err)
450450

451451
require.Eventuallyf(t, func() bool {
452-
resp, err := http.Get(fmt.Sprintf(
453-
"http://%v:%d/wrong/url",
454-
"localhost",
455-
fnet.DefaultHTTPPort,
456-
))
457-
return err == nil && resp.StatusCode == 404
452+
resp, err := http.Get(fmt.Sprintf("http://%s/ready", comp.server.HTTPAddr()))
453+
if err != nil {
454+
return false
455+
}
456+
require.NoError(t, err)
457+
return resp.StatusCode == 200
458458
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
459459
}
460460

@@ -472,7 +472,7 @@ func startTestComponent(
472472
require.NoError(t, err)
473473
}()
474474

475-
waitForServerToBeReady(t, comp)
475+
waitForServerToBeReady(t, comp, args.Server.HTTP.TLSConfig != nil)
476476
return comp
477477
}
478478

@@ -493,16 +493,16 @@ func TestShutdown(t *testing.T) {
493493
require.NoError(t, err)
494494
}()
495495

496-
waitForServerToBeReady(t, comp)
496+
waitForServerToBeReady(t, comp, args.Server.HTTP.TLSConfig != nil)
497497

498498
// First request should be forwarded on channel
499-
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
499+
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPAddr()))
500500
require.NoError(t, err)
501501

502502
codes := make(chan int)
503503
for range 5 {
504504
go func() {
505-
res, err := http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
505+
res, err := http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPAddr()))
506506
if err != nil || res == nil {
507507
// This should not happen but if it does we return -1 here so test will fail.
508508
codes <- -1
@@ -546,18 +546,18 @@ func TestCancelRequest(t *testing.T) {
546546
require.NoError(t, err)
547547
}()
548548

549-
waitForServerToBeReady(t, comp)
549+
waitForServerToBeReady(t, comp, args.Server.HTTP.TLSConfig != nil)
550550

551551
// First request should be forwarded on channel
552-
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
552+
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPAddr()))
553553
require.NoError(t, err)
554554

555555
var wg sync.WaitGroup
556556
for range 5 {
557557
wg.Go(func() {
558558
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
559559
defer cancel()
560-
res, err := http.DefaultClient.Do(newRequest(t, ctx, comp.server.HTTPListenAddress()))
560+
res, err := http.DefaultClient.Do(newRequest(t, ctx, comp.server.HTTPAddr()))
561561
require.ErrorIs(t, err, context.DeadlineExceeded)
562562
require.Nil(t, res)
563563
})
@@ -576,13 +576,12 @@ func newRequest(t *testing.T, ctx context.Context, httpListendAddress string) *h
576576
return req
577577
}
578578

579-
func waitForServerToBeReady(t *testing.T, comp *Component) {
579+
func waitForServerToBeReady(t *testing.T, comp *Component, useTls bool) {
580580
// Determine if TLS is enabled to choose the right protocol
581581
protocol := "http"
582582
var tlsConfig *tls.Config
583583

584-
serverConfig := comp.server.ServerConfig()
585-
if serverConfig.HTTP.TLSConfig != nil {
584+
if useTls {
586585
protocol = "https"
587586
tlsConfig = &tls.Config{
588587
InsecureSkipVerify: true,
@@ -592,7 +591,7 @@ func waitForServerToBeReady(t *testing.T, comp *Component) {
592591
url := fmt.Sprintf(
593592
"%s://%s/wrong/url",
594593
protocol,
595-
comp.server.HTTPListenAddress(),
594+
comp.server.HTTPAddr(),
596595
)
597596

598597
client := &http.Client{Timeout: 1 * time.Second}
@@ -604,8 +603,12 @@ func waitForServerToBeReady(t *testing.T, comp *Component) {
604603

605604
require.Eventuallyf(t, func() bool {
606605
resp, err := client.Get(url)
606+
if err != nil {
607+
return false
608+
}
607609

608-
return err == nil && resp != nil && resp.StatusCode == 404
610+
require.NoError(t, resp.Body.Close())
611+
return resp.StatusCode == 404
609612
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
610613
}
611614

internal/component/loki/source/api/internal/lokipush/metrics.go

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)