Skip to content

Template dynamicAdvertisedListener, allow {{.brokerId}} for a dynamic… #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
GOPKGS = $(shell go list ./... | grep -v /vendor/)
BUILD_FLAGS ?=
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
TAG ?= "v0.4.0"
TAG ?= "v0.4.1"
GOOS ?= $(if $(TARGETOS),$(TARGETOS),linux)
GOARCH ?= $(if $(TARGETARCH),$(TARGETARCH),amd64)
GOARM ?= $(TARGETVARIANT)
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ As not every Kafka release adds new messages/versions which are relevant to the

Linux

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-linux-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.1/kafka-proxy-v0.4.1-linux-amd64.tar.gz | tar xz

macOS

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-darwin-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.1/kafka-proxy-v0.4.1-darwin-amd64.tar.gz | tar xz

2. Move the binary in to your PATH.

Expand All @@ -70,7 +70,7 @@ Docker images are available on [Docker Hub](https://hub.docker.com/r/grepplabs/k

You can launch a kafka-proxy container for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0 \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.1 \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand All @@ -89,7 +89,7 @@ Docker images with precompiled plugins located in `/opt/kafka-proxy/bin/` are ta

You can launch a kafka-proxy container with auth-ldap plugin for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0-all \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.1-all \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand Down Expand Up @@ -142,7 +142,7 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--default-listener-ip string Default listener IP (default "0.0.0.0")
--deterministic-listeners Enable deterministic listeners (listener port = min port + broker id).
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
--dynamic-advertised-listener string Advertised address for dynamic listeners. If empty, default-listener-ip is used
--dynamic-advertised-listener string Advertised address for dynamic listeners. If left empty, default-listener-ip is used. Supports templating with {{.brokerId}} for dynamic hostnames and a fixed port if provided.
--dynamic-listeners-disable Disable dynamic listeners.
--dynamic-sequential-min-port int If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func init() {
func initFlags() {
// proxy
Server.Flags().StringVar(&c.Proxy.DefaultListenerIP, "default-listener-ip", "0.0.0.0", "Default listener IP")
Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "", "Advertised address for dynamic listeners. If empty, default-listener-ip is used")
Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "", "Advertised address for dynamic listeners. If left empty, default-listener-ip is used. Supports templating with {{.brokerId}} for dynamic hostnames and a fixed port if provided.")
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/aws/aws-sdk-go-v2 v1.36.1
github.com/aws/aws-sdk-go-v2/config v1.29.6
github.com/aws/aws-sdk-go-v2/credentials v1.17.59
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14
github.com/cenkalti/backoff v1.1.0
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2
Expand Down Expand Up @@ -37,7 +39,6 @@ require (
require (
cloud.google.com/go/compute/metadata v0.5.2 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.59 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 // indirect
Expand All @@ -46,7 +47,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 // indirect
github.com/aws/smithy-go v1.22.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
28 changes: 0 additions & 28 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,30 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go-v2 v1.17.2 h1:r0yRZInwiPBNpQ4aDy/Ssh3ROWsGtKDwar2JS8Lm+N8=
github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.36.1 h1:iTDl5U6oAhkNPba0e1t1hrwAo02ZMqbrGq4k5JBWM5E=
github.com/aws/aws-sdk-go-v2 v1.36.1/go.mod h1:5PMILGVKiW32oDzjj6RU52yrNrDPUHcbZQYr1sM7qmM=
github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE=
github.com/aws/aws-sdk-go-v2/config v1.18.4/go.mod h1:EZxMPLSdGAZ3eAmkqXfYbRppZJTzFTkv8VyEzJhKko4=
github.com/aws/aws-sdk-go-v2/config v1.29.6 h1:fqgqEKK5HaZVWLQoLiC9Q+xDlSp+1LYidp6ybGE2OGg=
github.com/aws/aws-sdk-go-v2/config v1.29.6/go.mod h1:Ft+WLODzDQmCTHDvqAH1JfC2xxbZ0MxpZAcJqmE1LTQ=
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 h1:nEbHIyJy7mCvQ/kzGG7VWHSBpRB4H6sJy3bWierWUtg=
github.com/aws/aws-sdk-go-v2/credentials v1.13.4/go.mod h1:/Cj5w9LRsNTLSwexsohwDME32OzJ6U81Zs33zr2ZWOM=
github.com/aws/aws-sdk-go-v2/credentials v1.17.59 h1:9btwmrt//Q6JcSdgJOLI98sdr5p7tssS9yAsGe8aKP4=
github.com/aws/aws-sdk-go-v2/credentials v1.17.59/go.mod h1:NM8fM6ovI3zak23UISdWidyZuI1ghNe2xjzUZAyT+08=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 h1:tpNOglTZ8kg9T38NpcGBxudqfUAwUzyUnLQ4XSd0CHE=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM0fvu7deD08vvdRXyc/ueV+0SqaWE=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 h1:KwsodFKVQTlI5EyhRSugALzsV6mG/SGrdjlMXSZSdso=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28/go.mod h1:EY3APf9MzygVhKuPXAc5H+MkGb8k/DOSQjWS0LgkKqI=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 h1:5WU31cY7m0tG+AiaXuXGoMzo2GBQ1IixtWa8Yywsgco=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 h1:BjUcr3X3K0wZPGFg2bxOWW3VPN8rkE3/61zhP+IHviA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32/go.mod h1:80+OGC/bgzzFFTUmcuwD0lb4YutwQeKLFpmt6hoWapU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 h1:WW0qSzDWoiWU2FS5DbKpxGilFVlCEJPwx4YtjdfI0Jw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 h1:m1GeXHVMJsRsUAqG6HjZWx9dj7F5TR+cF1bjyfYyBd4=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32/go.mod h1:IitoQxGfaKdVLNg0hD8/DXmAqNy0H4K2H2Sf91ti8sI=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 h1:N2eKFw2S+JWRCtTt0IhIX7uoGGQciD4p6ba+SJv4WEU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 h1:jlgyHbkZQAgAc7VIxJDmtouH8eNjOk2REVAQfVhdaiQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20/go.mod h1:Xs52xaLBqDEKRcAfX/hgjmD3YQ7c/W+BEyfamlO/W2E=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 h1:SYVGSFQHlchIcy6e7x12bsrxClCXSP5et8cqVhL8cuw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13/go.mod h1:kizuDaLX37bG5WZaoxGPQR/LNFXpxp0vsUnqfkWXfNE=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 h1:ActQgdTNQej/RuUJjB9uxYVLDOvRGtUreXF8L3c8wyg=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26/go.mod h1:uB9tV79ULEZUXc6Ob18A46KSQ0JDlrplPni9XW6Ot60=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 h1:/eE3DogBjYlvlbhd2ssWyeuovWunHLxfgw3s/OJa4GQ=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.15/go.mod h1:2PCJYpi7EKeA5SkStAmZlF6fi0uUABuhtF8ILHjGc3Y=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 h1:wihKuqYUlA2T/Rx+yu2s6NDAns8B9DgnRooB1PVhY+Q=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9/go.mod h1:2E/3D/mB8/r2J7nK42daoKP/ooCwbf0q1PznNc+DZTU=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 h1:M/zwXiL2iXUrHputuXgmO94TVNmcenPHxgLXLutodKE=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14/go.mod h1:RVwIw3y/IqxC2YEXSIkAzRDdEU1iRabDPaYjpGCbCGQ=
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 h1:VQFOLQVL3BrKM/NLO/7FiS4vcp5bqK0mGMyk09xLoAY=
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6/go.mod h1:Az3OXXYGyfNwQNsK/31L4R75qFYnO641RZGAoV3uH1c=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 h1:TzeR06UCMUq+KA3bDkujxK1GVGy+G8qQN/QVYzGLkQE=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14/go.mod h1:dspXf/oYWGWo6DEvj98wpaTeqt5+DMidZD0A9BYTizc=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -148,7 +124,6 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -197,8 +172,6 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -471,7 +444,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
52 changes: 47 additions & 5 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package proxy

import (
"bytes"
"crypto/tls"
"fmt"
"net"
"strconv"
"sync"
"text/template"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
Expand Down Expand Up @@ -190,17 +192,57 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
port := l.Addr().(*net.TCPAddr).Port
address := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(port))

dynamicAdvertisedListener := p.dynamicAdvertisedListener
if dynamicAdvertisedListener == "" {
dynamicAdvertisedListener = p.defaultListenerIP
dynamicAdvertisedHost, dynamicAdvertisedPort, err := p.getDynamicAdvertisedAddress(cfg.BrokerID, port)
if err != nil {
return "", 0, err
}
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedHost, fmt.Sprint(dynamicAdvertisedPort))
cfg.ListenerAddress = address

p.brokerToListenerConfig[brokerAddress] = cfg
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress)

return dynamicAdvertisedListener, int32(port), nil
return dynamicAdvertisedHost, int32(dynamicAdvertisedPort), nil
}

func (p *Listeners) getDynamicAdvertisedAddress(brokerID int32, port int) (string, int, error) {
dynamicAdvertisedListener := p.dynamicAdvertisedListener
if dynamicAdvertisedListener == "" {
return p.defaultListenerIP, port, nil
}
dynamicAdvertisedListener, err := p.templateDynamicAdvertisedAddress(brokerID)
if err != nil {
return "", 0, err
}
var (
dynamicAdvertisedHost = dynamicAdvertisedListener
dynamicAdvertisedPort = port
)
advHost, advPortStr, err := net.SplitHostPort(dynamicAdvertisedListener)
if err == nil {
if advPort, err := strconv.Atoi(advPortStr); err == nil {
dynamicAdvertisedHost = advHost
dynamicAdvertisedPort = advPort
}
}
return dynamicAdvertisedHost, dynamicAdvertisedPort, nil
}

func (p *Listeners) templateDynamicAdvertisedAddress(brokerID int32) (string, error) {
tmpl, err := template.New("dynamicAdvertisedHost").Option("missingkey=error").Parse(p.dynamicAdvertisedListener)
if err != nil {
return "", fmt.Errorf("failed to parse host template '%s': %w", p.dynamicAdvertisedListener, err)
}
var buf bytes.Buffer
data := map[string]any{
"brokerId": brokerID,
"brokerID": brokerID,
}
err = tmpl.Execute(&buf, data)
if err != nil {
return "", fmt.Errorf("failed to execute host template '%s': %w", p.dynamicAdvertisedListener, err)
}
return buf.String(), nil
}

func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn, error) {
Expand Down
Loading