Skip to content

Commit 5338629

Browse files
committed
Template dynamicAdvertisedListener, allow {{.brokerId}} for a dynamic advertited hostname based on brokerId. Use fixed port if provided.
1 parent 2fbf93a commit 5338629

File tree

2 files changed

+179
-5
lines changed

2 files changed

+179
-5
lines changed

Diff for: proxy/proxy.go

+47-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package proxy
22

33
import (
4+
"bytes"
45
"crypto/tls"
56
"fmt"
67
"net"
78
"strconv"
89
"sync"
10+
"text/template"
911

1012
"github.com/grepplabs/kafka-proxy/config"
1113
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
@@ -190,17 +192,57 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
190192
port := l.Addr().(*net.TCPAddr).Port
191193
address := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(port))
192194

193-
dynamicAdvertisedListener := p.dynamicAdvertisedListener
194-
if dynamicAdvertisedListener == "" {
195-
dynamicAdvertisedListener = p.defaultListenerIP
195+
dynamicAdvertisedHost, dynamicAdvertisedPort, err := p.getDynamicAdvertisedAddress(cfg.BrokerID, port)
196+
if err != nil {
197+
return "", 0, err
196198
}
197-
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
199+
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedHost, fmt.Sprint(dynamicAdvertisedPort))
198200
cfg.ListenerAddress = address
199201

200202
p.brokerToListenerConfig[brokerAddress] = cfg
201203
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress)
202204

203-
return dynamicAdvertisedListener, int32(port), nil
205+
return dynamicAdvertisedHost, int32(dynamicAdvertisedPort), nil
206+
}
207+
208+
func (p *Listeners) getDynamicAdvertisedAddress(brokerID int32, port int) (string, int, error) {
209+
dynamicAdvertisedListener := p.dynamicAdvertisedListener
210+
if dynamicAdvertisedListener == "" {
211+
return p.defaultListenerIP, port, nil
212+
}
213+
dynamicAdvertisedListener, err := p.templateDynamicAdvertisedAddress(brokerID)
214+
if err != nil {
215+
return "", 0, err
216+
}
217+
var (
218+
dynamicAdvertisedHost = dynamicAdvertisedListener
219+
dynamicAdvertisedPort = port
220+
)
221+
advHost, advPortStr, err := net.SplitHostPort(dynamicAdvertisedListener)
222+
if err == nil {
223+
if advPort, err := strconv.Atoi(advPortStr); err == nil {
224+
dynamicAdvertisedHost = advHost
225+
dynamicAdvertisedPort = advPort
226+
}
227+
}
228+
return dynamicAdvertisedHost, dynamicAdvertisedPort, nil
229+
}
230+
231+
func (p *Listeners) templateDynamicAdvertisedAddress(brokerID int32) (string, error) {
232+
tmpl, err := template.New("dynamicAdvertisedHost").Option("missingkey=error").Parse(p.dynamicAdvertisedListener)
233+
if err != nil {
234+
return "", fmt.Errorf("failed to parse host template '%s': %w", p.dynamicAdvertisedListener, err)
235+
}
236+
var buf bytes.Buffer
237+
data := map[string]any{
238+
"brokerId": brokerID,
239+
"brokerID": brokerID,
240+
}
241+
err = tmpl.Execute(&buf, data)
242+
if err != nil {
243+
return "", fmt.Errorf("failed to execute host template '%s': %w", p.dynamicAdvertisedListener, err)
244+
}
245+
return buf.String(), nil
204246
}
205247

206248
func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn, error) {

Diff for: proxy/proxy_test.go

+132
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,135 @@ func TestGetBrokerToListenerConfig(t *testing.T) {
283283
assert.ObjectsAreEqual(tt.mapping, mapping)
284284
}
285285
}
286+
287+
func TestGetDynamicAdvertisedAddress(t *testing.T) {
288+
tests := []struct {
289+
name string
290+
dynamicAdvertisedListener string
291+
defaultListenerIP string
292+
brokerID int32
293+
port int
294+
expectedHost string
295+
expectedPort int
296+
expectError bool
297+
}{
298+
{
299+
name: "Default listener IP is 127.0.0.1",
300+
dynamicAdvertisedListener: "",
301+
defaultListenerIP: "127.0.0.1",
302+
brokerID: 1,
303+
port: 9092,
304+
expectedHost: "127.0.0.1",
305+
expectedPort: 9092,
306+
expectError: false,
307+
},
308+
{
309+
name: "Default listener IP is 0.0.0.0",
310+
dynamicAdvertisedListener: "",
311+
defaultListenerIP: "0.0.0.0",
312+
brokerID: 1,
313+
port: 9092,
314+
expectedHost: "0.0.0.0",
315+
expectedPort: 9092,
316+
expectError: false,
317+
},
318+
{
319+
name: "Default listener IP is localhost",
320+
dynamicAdvertisedListener: "",
321+
defaultListenerIP: "localhost",
322+
brokerID: 1,
323+
port: 9092,
324+
expectedHost: "localhost",
325+
expectedPort: 9092,
326+
expectError: false,
327+
},
328+
{
329+
name: "Dynamic listener no template, host is IP",
330+
dynamicAdvertisedListener: "0.0.0.0",
331+
defaultListenerIP: "127.0.0.1",
332+
brokerID: 2,
333+
port: 9093,
334+
expectedHost: "0.0.0.0",
335+
expectedPort: 9093,
336+
expectError: false,
337+
},
338+
{
339+
name: "Dynamic listener no template, host is IP and port is provided",
340+
dynamicAdvertisedListener: "0.0.0.0:30000",
341+
defaultListenerIP: "127.0.0.1",
342+
brokerID: 2,
343+
port: 9093,
344+
expectedHost: "0.0.0.0",
345+
expectedPort: 30000,
346+
expectError: false,
347+
},
348+
{
349+
name: "Dynamic listener no template, host is dns name",
350+
dynamicAdvertisedListener: "kafka-proxy.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com",
351+
defaultListenerIP: "127.0.0.1",
352+
brokerID: 2,
353+
port: 9093,
354+
expectedHost: "kafka-proxy.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com",
355+
expectedPort: 9093,
356+
expectError: false,
357+
},
358+
{
359+
name: "Dynamic listener no template, host is dns name and port is provided",
360+
dynamicAdvertisedListener: "kafka-proxy.grepplabs.com:30000",
361+
defaultListenerIP: "127.0.0.1",
362+
brokerID: 2,
363+
port: 9093,
364+
expectedHost: "kafka-proxy.grepplabs.com",
365+
expectedPort: 30000,
366+
expectError: false,
367+
},
368+
{
369+
name: "Dynamic listener with template",
370+
dynamicAdvertisedListener: "b-{{.brokerId}}.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com",
371+
defaultListenerIP: "127.0.0.1",
372+
brokerID: 2,
373+
port: 9093,
374+
expectedHost: "b-2.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com",
375+
expectedPort: 9093,
376+
expectError: false,
377+
},
378+
{
379+
name: "Dynamic listener with template and port is provided",
380+
dynamicAdvertisedListener: "b-{{.brokerId}}.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com:30000",
381+
defaultListenerIP: "127.0.0.1",
382+
brokerID: 2,
383+
port: 9093,
384+
expectedHost: "b-2.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com",
385+
expectedPort: 30000,
386+
expectError: false,
387+
},
388+
{
389+
name: "Invalid dynamic listener template",
390+
dynamicAdvertisedListener: "broker-{{.invalid}}",
391+
defaultListenerIP: "127.0.0.1",
392+
brokerID: 3,
393+
port: 9094,
394+
expectedHost: "",
395+
expectedPort: 0,
396+
expectError: true,
397+
},
398+
}
399+
400+
for _, tt := range tests {
401+
t.Run(tt.name, func(t *testing.T) {
402+
listeners := &Listeners{
403+
dynamicAdvertisedListener: tt.dynamicAdvertisedListener,
404+
defaultListenerIP: tt.defaultListenerIP,
405+
}
406+
407+
host, port, err := listeners.getDynamicAdvertisedAddress(tt.brokerID, tt.port)
408+
if tt.expectError {
409+
assert.Error(t, err)
410+
} else {
411+
assert.NoError(t, err)
412+
assert.Equal(t, tt.expectedHost, host)
413+
assert.Equal(t, tt.expectedPort, port)
414+
}
415+
})
416+
}
417+
}

0 commit comments

Comments
 (0)