Skip to content

Commit 79eaf5f

Browse files
committed
prepare for rework
1 parent b8646e4 commit 79eaf5f

File tree

4 files changed

+22
-54
lines changed

4 files changed

+22
-54
lines changed

cmd/kafka-proxy/server.go

-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ func initFlags() {
8989
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
9090
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
9191
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
92-
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
9392
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
9493
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
9594

config/config.go

-5
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ type ListenerConfig struct {
2929
ListenerAddress string
3030
AdvertisedAddress string
3131
}
32-
type IdListenerConfig struct {
33-
BrokerAddress string
34-
Listener net.Listener
35-
}
3632
type DialAddressMapping struct {
3733
SourceAddress string
3834
DestinationAddress string
@@ -78,7 +74,6 @@ type Config struct {
7874
DefaultListenerIP string
7975
BootstrapServers []ListenerConfig
8076
ExternalServers []ListenerConfig
81-
DeterministicListeners bool
8277
DialAddressMappings []DialAddressMapping
8378
DisableDynamicListeners bool
8479
DynamicAdvertisedListener string

proxy/protocol/responses.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var (
2727

2828
func createMetadataResponseSchemaVersions() []Schema {
2929
metadataBrokerV0 := NewSchema("metadata_broker_v0",
30-
&Mfield{Name: "node_id", Ty: TypeInt32},
30+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
3131
&Mfield{Name: hostKeyName, Ty: TypeStr},
3232
&Mfield{Name: portKeyName, Ty: TypeInt32},
3333
)
@@ -52,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
5252
)
5353

5454
metadataBrokerV1 := NewSchema("metadata_broker_v1",
55-
&Mfield{Name: "node_id", Ty: TypeInt32},
55+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
5656
&Mfield{Name: hostKeyName, Ty: TypeStr},
5757
&Mfield{Name: portKeyName, Ty: TypeInt32},
5858
&Mfield{Name: "rack", Ty: TypeNullableStr},
5959
)
6060

6161
metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
62-
&Mfield{Name: "node_id", Ty: TypeInt32},
62+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
6363
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
6464
&Mfield{Name: portKeyName, Ty: TypeInt32},
6565
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
@@ -249,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {
249249

250250
func createFindCoordinatorResponseSchemaVersions() []Schema {
251251
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
252-
&Mfield{Name: "node_id", Ty: TypeInt32},
252+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
253253
&Mfield{Name: hostKeyName, Ty: TypeStr},
254254
&Mfield{Name: portKeyName, Ty: TypeInt32},
255255
)
256256

257257
findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
258-
&Mfield{Name: "node_id", Ty: TypeInt32},
258+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
259259
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
260260
&Mfield{Name: portKeyName, Ty: TypeInt32},
261261
)
@@ -341,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
341341
}
342342
}
343343
if port != newPort {
344-
err = broker.Replace(portKeyName, int32(newPort))
344+
err = broker.Replace(portKeyName, newPort)
345345
if err != nil {
346346
return err
347347
}

proxy/proxy.go

+16-42
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@ type Listeners struct {
2525

2626
listenFunc ListenFunc
2727

28-
deterministicListeners bool
2928
disableDynamicListeners bool
3029
dynamicSequentialMinPort int
3130

32-
brokerToListenerConfig map[string]config.ListenerConfig
33-
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
34-
lock sync.RWMutex
31+
brokerToListenerConfig map[string]config.ListenerConfig
32+
lock sync.RWMutex
3533
}
3634

3735
func NewListeners(cfg *config.Config) (*Listeners, error) {
@@ -66,19 +64,15 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6664
return nil, err
6765
}
6866

69-
brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)
70-
7167
return &Listeners{
72-
defaultListenerIP: defaultListenerIP,
73-
dynamicAdvertisedListener: dynamicAdvertisedListener,
74-
connSrc: make(chan Conn, 1),
75-
brokerToListenerConfig: brokerToListenerConfig,
76-
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
77-
tcpConnOptions: tcpConnOptions,
78-
listenFunc: listenFunc,
79-
deterministicListeners: cfg.Proxy.DeterministicListeners,
80-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81-
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
68+
defaultListenerIP: defaultListenerIP,
69+
dynamicAdvertisedListener: dynamicAdvertisedListener,
70+
connSrc: make(chan Conn, 1),
71+
brokerToListenerConfig: brokerToListenerConfig,
72+
tcpConnOptions: tcpConnOptions,
73+
listenFunc: listenFunc,
74+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
75+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
8276
}, nil
8377
}
8478

@@ -132,49 +126,30 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br
132126

133127
p.lock.RLock()
134128
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
135-
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
136129
p.lock.RUnlock()
137130

138131
if ok {
139-
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
132+
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s, node=%d", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress, brokerId)
140133
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
141134
}
142135
if !p.disableDynamicListeners {
143-
if brokerIdFound {
144-
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
145-
// Existing broker ID found, but with a different upstream broker
146-
// Close existing listener, remove two mappings:
147-
// * ID to removed upstream broker
148-
// * removed upstream broker
149-
idListenerConfig.Listener.Close()
150-
p.lock.Lock()
151-
delete(p.brokerIdToIdListenerConfig, brokerId)
152-
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
153-
p.lock.Unlock()
154-
}
155136
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
156-
return p.ListenDynamicInstance(brokerAddress, brokerId)
137+
return p.ListenDynamicInstance(brokerAddress)
157138
}
158139
return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort)
159140
}
160141

161-
func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
142+
func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, error) {
162143
p.lock.Lock()
163144
defer p.lock.Unlock()
164145
// double check
165146
if v, ok := p.brokerToListenerConfig[brokerAddress]; ok {
166147
return util.SplitHostPort(v.AdvertisedAddress)
167148
}
168149

169-
var defaultListenerAddress string
170-
171-
if p.deterministicListeners {
172-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
173-
} else {
174-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
175-
if p.dynamicSequentialMinPort != 0 {
176-
p.dynamicSequentialMinPort += 1
177-
}
150+
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
151+
if p.dynamicSequentialMinPort != 0 {
152+
p.dynamicSequentialMinPort += 1
178153
}
179154

180155
cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
@@ -192,7 +167,6 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
192167

193168
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
194169
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
195-
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}
196170

197171
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
198172

0 commit comments

Comments
 (0)