Skip to content

Commit 4a1dc0a

Browse files
committed
Template dynamicAdvertisedListener, allow {{.brokerId}} for a dynamic advertited hostname based on brokerId. Use fixed port if provided.
1 parent 2fbf93a commit 4a1dc0a

File tree

7 files changed

+188
-42
lines changed

7 files changed

+188
-42
lines changed

Diff for: Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
1010
GOPKGS = $(shell go list ./... | grep -v /vendor/)
1111
BUILD_FLAGS ?=
1212
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
13-
TAG ?= "v0.4.0"
13+
TAG ?= "v0.4.1"
1414
GOOS ?= $(if $(TARGETOS),$(TARGETOS),linux)
1515
GOARCH ?= $(if $(TARGETARCH),$(TARGETARCH),amd64)
1616
GOARM ?= $(TARGETVARIANT)

Diff for: README.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ As not every Kafka release adds new messages/versions which are relevant to the
4848

4949
Linux
5050

51-
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-linux-amd64.tar.gz | tar xz
51+
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.1/kafka-proxy-v0.4.1-linux-amd64.tar.gz | tar xz
5252

5353
macOS
5454

55-
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-darwin-amd64.tar.gz | tar xz
55+
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.1/kafka-proxy-v0.4.1-darwin-amd64.tar.gz | tar xz
5656

5757
2. Move the binary in to your PATH.
5858

@@ -70,7 +70,7 @@ Docker images are available on [Docker Hub](https://hub.docker.com/r/grepplabs/k
7070
7171
You can launch a kafka-proxy container for trying it out with
7272
73-
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0 \
73+
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.1 \
7474
server \
7575
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
7676
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
@@ -89,7 +89,7 @@ Docker images with precompiled plugins located in `/opt/kafka-proxy/bin/` are ta
8989
9090
You can launch a kafka-proxy container with auth-ldap plugin for trying it out with
9191
92-
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0-all \
92+
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.1-all \
9393
server \
9494
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
9595
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
@@ -142,7 +142,7 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
142142
--default-listener-ip string Default listener IP (default "0.0.0.0")
143143
--deterministic-listeners Enable deterministic listeners (listener port = min port + broker id).
144144
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
145-
--dynamic-advertised-listener string Advertised address for dynamic listeners. If empty, default-listener-ip is used
145+
--dynamic-advertised-listener string Advertised address for dynamic listeners. If left empty, default-listener-ip is used. Supports templating with {{.brokerId}} for dynamic hostnames and a fixed port if provided.
146146
--dynamic-listeners-disable Disable dynamic listeners.
147147
--dynamic-sequential-min-port int If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.
148148
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started

Diff for: cmd/kafka-proxy/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func init() {
8787
func initFlags() {
8888
// proxy
8989
Server.Flags().StringVar(&c.Proxy.DefaultListenerIP, "default-listener-ip", "0.0.0.0", "Default listener IP")
90-
Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "", "Advertised address for dynamic listeners. If empty, default-listener-ip is used")
90+
Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "", "Advertised address for dynamic listeners. If left empty, default-listener-ip is used. Supports templating with {{.brokerId}} for dynamic hostnames and a fixed port if provided.")
9191
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
9292
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
9393
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")

Diff for: go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ require (
66
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
77
github.com/aws/aws-sdk-go-v2 v1.36.1
88
github.com/aws/aws-sdk-go-v2/config v1.29.6
9+
github.com/aws/aws-sdk-go-v2/credentials v1.17.59
10+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14
911
github.com/cenkalti/backoff v1.1.0
1012
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a
1113
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2
@@ -37,7 +39,6 @@ require (
3739
require (
3840
cloud.google.com/go/compute/metadata v0.5.2 // indirect
3941
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect
40-
github.com/aws/aws-sdk-go-v2/credentials v1.17.59 // indirect
4142
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 // indirect
4243
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 // indirect
4344
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 // indirect
@@ -46,7 +47,6 @@ require (
4647
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 // indirect
4748
github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 // indirect
4849
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 // indirect
49-
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 // indirect
5050
github.com/aws/smithy-go v1.22.2 // indirect
5151
github.com/beorn7/perks v1.0.1 // indirect
5252
github.com/cespare/xxhash/v2 v2.3.0 // indirect

Diff for: go.sum

-28
Original file line numberDiff line numberDiff line change
@@ -14,54 +14,30 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5
1414
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
1515
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
1616
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
17-
github.com/aws/aws-sdk-go-v2 v1.17.2 h1:r0yRZInwiPBNpQ4aDy/Ssh3ROWsGtKDwar2JS8Lm+N8=
18-
github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
1917
github.com/aws/aws-sdk-go-v2 v1.36.1 h1:iTDl5U6oAhkNPba0e1t1hrwAo02ZMqbrGq4k5JBWM5E=
2018
github.com/aws/aws-sdk-go-v2 v1.36.1/go.mod h1:5PMILGVKiW32oDzjj6RU52yrNrDPUHcbZQYr1sM7qmM=
21-
github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE=
22-
github.com/aws/aws-sdk-go-v2/config v1.18.4/go.mod h1:EZxMPLSdGAZ3eAmkqXfYbRppZJTzFTkv8VyEzJhKko4=
2319
github.com/aws/aws-sdk-go-v2/config v1.29.6 h1:fqgqEKK5HaZVWLQoLiC9Q+xDlSp+1LYidp6ybGE2OGg=
2420
github.com/aws/aws-sdk-go-v2/config v1.29.6/go.mod h1:Ft+WLODzDQmCTHDvqAH1JfC2xxbZ0MxpZAcJqmE1LTQ=
25-
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 h1:nEbHIyJy7mCvQ/kzGG7VWHSBpRB4H6sJy3bWierWUtg=
26-
github.com/aws/aws-sdk-go-v2/credentials v1.13.4/go.mod h1:/Cj5w9LRsNTLSwexsohwDME32OzJ6U81Zs33zr2ZWOM=
2721
github.com/aws/aws-sdk-go-v2/credentials v1.17.59 h1:9btwmrt//Q6JcSdgJOLI98sdr5p7tssS9yAsGe8aKP4=
2822
github.com/aws/aws-sdk-go-v2/credentials v1.17.59/go.mod h1:NM8fM6ovI3zak23UISdWidyZuI1ghNe2xjzUZAyT+08=
29-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 h1:tpNOglTZ8kg9T38NpcGBxudqfUAwUzyUnLQ4XSd0CHE=
30-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM0fvu7deD08vvdRXyc/ueV+0SqaWE=
3123
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 h1:KwsodFKVQTlI5EyhRSugALzsV6mG/SGrdjlMXSZSdso=
3224
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28/go.mod h1:EY3APf9MzygVhKuPXAc5H+MkGb8k/DOSQjWS0LgkKqI=
33-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 h1:5WU31cY7m0tG+AiaXuXGoMzo2GBQ1IixtWa8Yywsgco=
34-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY=
3525
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 h1:BjUcr3X3K0wZPGFg2bxOWW3VPN8rkE3/61zhP+IHviA=
3626
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32/go.mod h1:80+OGC/bgzzFFTUmcuwD0lb4YutwQeKLFpmt6hoWapU=
37-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 h1:WW0qSzDWoiWU2FS5DbKpxGilFVlCEJPwx4YtjdfI0Jw=
38-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w=
3927
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 h1:m1GeXHVMJsRsUAqG6HjZWx9dj7F5TR+cF1bjyfYyBd4=
4028
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32/go.mod h1:IitoQxGfaKdVLNg0hD8/DXmAqNy0H4K2H2Sf91ti8sI=
41-
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 h1:N2eKFw2S+JWRCtTt0IhIX7uoGGQciD4p6ba+SJv4WEU=
42-
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w=
4329
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk=
4430
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
4531
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA=
4632
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY=
47-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 h1:jlgyHbkZQAgAc7VIxJDmtouH8eNjOk2REVAQfVhdaiQ=
48-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20/go.mod h1:Xs52xaLBqDEKRcAfX/hgjmD3YQ7c/W+BEyfamlO/W2E=
4933
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 h1:SYVGSFQHlchIcy6e7x12bsrxClCXSP5et8cqVhL8cuw=
5034
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13/go.mod h1:kizuDaLX37bG5WZaoxGPQR/LNFXpxp0vsUnqfkWXfNE=
51-
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 h1:ActQgdTNQej/RuUJjB9uxYVLDOvRGtUreXF8L3c8wyg=
52-
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26/go.mod h1:uB9tV79ULEZUXc6Ob18A46KSQ0JDlrplPni9XW6Ot60=
5335
github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 h1:/eE3DogBjYlvlbhd2ssWyeuovWunHLxfgw3s/OJa4GQ=
5436
github.com/aws/aws-sdk-go-v2/service/sso v1.24.15/go.mod h1:2PCJYpi7EKeA5SkStAmZlF6fi0uUABuhtF8ILHjGc3Y=
55-
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 h1:wihKuqYUlA2T/Rx+yu2s6NDAns8B9DgnRooB1PVhY+Q=
56-
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9/go.mod h1:2E/3D/mB8/r2J7nK42daoKP/ooCwbf0q1PznNc+DZTU=
5737
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 h1:M/zwXiL2iXUrHputuXgmO94TVNmcenPHxgLXLutodKE=
5838
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14/go.mod h1:RVwIw3y/IqxC2YEXSIkAzRDdEU1iRabDPaYjpGCbCGQ=
59-
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 h1:VQFOLQVL3BrKM/NLO/7FiS4vcp5bqK0mGMyk09xLoAY=
60-
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6/go.mod h1:Az3OXXYGyfNwQNsK/31L4R75qFYnO641RZGAoV3uH1c=
6139
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 h1:TzeR06UCMUq+KA3bDkujxK1GVGy+G8qQN/QVYzGLkQE=
6240
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14/go.mod h1:dspXf/oYWGWo6DEvj98wpaTeqt5+DMidZD0A9BYTizc=
63-
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
64-
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
6541
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
6642
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
6743
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -148,7 +124,6 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
148124
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
149125
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
150126
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
151-
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
152127
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
153128
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
154129
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -197,8 +172,6 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
197172
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
198173
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
199174
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
200-
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
201-
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
202175
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
203176
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
204177
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -471,7 +444,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
471444
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
472445
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
473446
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
474-
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
475447
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
476448
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
477449
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

Diff for: proxy/proxy.go

+47-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package proxy
22

33
import (
4+
"bytes"
45
"crypto/tls"
56
"fmt"
67
"net"
78
"strconv"
89
"sync"
10+
"text/template"
911

1012
"github.com/grepplabs/kafka-proxy/config"
1113
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
@@ -190,17 +192,57 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
190192
port := l.Addr().(*net.TCPAddr).Port
191193
address := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(port))
192194

193-
dynamicAdvertisedListener := p.dynamicAdvertisedListener
194-
if dynamicAdvertisedListener == "" {
195-
dynamicAdvertisedListener = p.defaultListenerIP
195+
dynamicAdvertisedHost, dynamicAdvertisedPort, err := p.getDynamicAdvertisedAddress(cfg.BrokerID, port)
196+
if err != nil {
197+
return "", 0, err
196198
}
197-
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
199+
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedHost, fmt.Sprint(dynamicAdvertisedPort))
198200
cfg.ListenerAddress = address
199201

200202
p.brokerToListenerConfig[brokerAddress] = cfg
201203
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress)
202204

203-
return dynamicAdvertisedListener, int32(port), nil
205+
return dynamicAdvertisedHost, int32(dynamicAdvertisedPort), nil
206+
}
207+
208+
func (p *Listeners) getDynamicAdvertisedAddress(brokerID int32, port int) (string, int, error) {
209+
dynamicAdvertisedListener := p.dynamicAdvertisedListener
210+
if dynamicAdvertisedListener == "" {
211+
return p.defaultListenerIP, port, nil
212+
}
213+
dynamicAdvertisedListener, err := p.templateDynamicAdvertisedAddress(brokerID)
214+
if err != nil {
215+
return "", 0, err
216+
}
217+
var (
218+
dynamicAdvertisedHost = dynamicAdvertisedListener
219+
dynamicAdvertisedPort = port
220+
)
221+
advHost, advPortStr, err := net.SplitHostPort(dynamicAdvertisedListener)
222+
if err == nil {
223+
if advPort, err := strconv.Atoi(advPortStr); err == nil {
224+
dynamicAdvertisedHost = advHost
225+
dynamicAdvertisedPort = advPort
226+
}
227+
}
228+
return dynamicAdvertisedHost, dynamicAdvertisedPort, nil
229+
}
230+
231+
func (p *Listeners) templateDynamicAdvertisedAddress(brokerID int32) (string, error) {
232+
tmpl, err := template.New("dynamicAdvertisedHost").Option("missingkey=error").Parse(p.dynamicAdvertisedListener)
233+
if err != nil {
234+
return "", fmt.Errorf("failed to parse host template '%s': %w", p.dynamicAdvertisedListener, err)
235+
}
236+
var buf bytes.Buffer
237+
data := map[string]any{
238+
"brokerId": brokerID,
239+
"brokerID": brokerID,
240+
}
241+
err = tmpl.Execute(&buf, data)
242+
if err != nil {
243+
return "", fmt.Errorf("failed to execute host template '%s': %w", p.dynamicAdvertisedListener, err)
244+
}
245+
return buf.String(), nil
204246
}
205247

206248
func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn, error) {

0 commit comments

Comments
 (0)